Lesson 35 - Get Compute Auth Token Working

This commit is contained in:
Norman Lansing
2026-02-28 12:32:28 -05:00
parent 1d477ee42a
commit 4fde462bce
7743 changed files with 1397833 additions and 18 deletions

View File

@@ -0,0 +1,264 @@
#ifndef AWS_TESTING_ASYNC_STREAM_TESTER_H
#define AWS_TESTING_ASYNC_STREAM_TESTER_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/io/async_stream.h>
#include <aws/common/byte_buf.h>
#include <aws/common/condition_variable.h>
#include <aws/common/mutex.h>
#include <aws/common/thread.h>
#include <aws/io/future.h>
#include <aws/testing/stream_tester.h>
#ifndef AWS_UNSTABLE_TESTING_API
# error This code is designed for use by AWS owned libraries for the AWS C99 SDK. \
You are welcome to use it, but we make no promises on the stability of this API. \
To enable use of this code, set the AWS_UNSTABLE_TESTING_API compiler flag.
#endif
/**
* Use aws_async_input_stream_tester to test edge cases in systems that take async streams.
* You can customize its behavior (e.g. fail on 3rd read, always complete async, always complete synchronously, etc)
*/
enum aws_async_read_completion_strategy {
/* the tester has its own thread, and reads always complete from there */
AWS_ASYNC_READ_COMPLETES_ON_ANOTHER_THREAD,
/* reads complete before read() even returns */
AWS_ASYNC_READ_COMPLETES_IMMEDIATELY,
/* sometimes reads complete immediately, sometimes they complete on another thread */
AWS_ASYNC_READ_COMPLETES_ON_RANDOM_THREAD,
};
struct aws_async_input_stream_tester_options {
/* the async tester uses the synchronous tester under the hood,
* so here are those options */
struct aws_input_stream_tester_options base;
enum aws_async_read_completion_strategy completion_strategy;
/* if non-zero, a read will take at least this long to complete */
uint64_t read_duration_ns;
};
struct aws_async_input_stream_tester {
struct aws_async_input_stream base;
struct aws_allocator *alloc;
struct aws_async_input_stream_tester_options options;
struct aws_input_stream *source_stream;
struct aws_thread thread;
struct {
struct aws_mutex lock;
struct aws_condition_variable cvar;
/* when thread should perform a read, these are set */
struct aws_byte_buf *read_dest;
struct aws_future_bool *read_future;
/* if true, thread should shut down */
bool do_shutdown;
} synced_data;
struct aws_atomic_var num_outstanding_reads;
};
static inline void s_async_input_stream_tester_do_actual_read(
struct aws_async_input_stream_tester *impl,
struct aws_byte_buf *dest,
struct aws_future_bool *read_future) {
int error_code = 0;
/* delay, if that's how we're configured */
if (impl->options.read_duration_ns != 0) {
aws_thread_current_sleep(impl->options.read_duration_ns);
}
/* Keep calling read() until we get some data, or hit EOF.
* We do this because the synchronous aws_input_stream API allows
* 0 byte reads, but the aws_async_input_stream API does not. */
size_t prev_len = dest->len;
struct aws_stream_status status = {.is_end_of_stream = false, .is_valid = true};
while ((dest->len == prev_len) && !status.is_end_of_stream) {
/* read from stream */
if (aws_input_stream_read(impl->source_stream, dest) != AWS_OP_SUCCESS) {
error_code = aws_last_error();
goto done;
}
/* check if stream is done */
if (aws_input_stream_get_status(impl->source_stream, &status) != AWS_OP_SUCCESS) {
error_code = aws_last_error();
goto done;
}
}
done:
aws_atomic_fetch_sub(&impl->num_outstanding_reads, 1);
if (error_code != 0) {
aws_future_bool_set_error(read_future, error_code);
} else {
aws_future_bool_set_result(read_future, status.is_end_of_stream);
}
aws_future_bool_release(read_future);
}
static inline struct aws_future_bool *s_async_input_stream_tester_read(
struct aws_async_input_stream *stream,
struct aws_byte_buf *dest) {
struct aws_async_input_stream_tester *impl = (struct aws_async_input_stream_tester *)stream->impl;
size_t prev_outstanding_reads = aws_atomic_fetch_add(&impl->num_outstanding_reads, 1);
AWS_FATAL_ASSERT(prev_outstanding_reads == 0 && "Overlapping read() calls are forbidden");
struct aws_future_bool *read_future = aws_future_bool_new(stream->alloc);
bool do_on_thread = false;
switch (impl->options.completion_strategy) {
case AWS_ASYNC_READ_COMPLETES_ON_ANOTHER_THREAD:
do_on_thread = true;
break;
case AWS_ASYNC_READ_COMPLETES_IMMEDIATELY:
do_on_thread = false;
break;
case AWS_ASYNC_READ_COMPLETES_ON_RANDOM_THREAD:
do_on_thread = (rand() % 2 == 0);
break;
}
if (do_on_thread) {
/* BEGIN CRITICAL SECTION */
aws_mutex_lock(&impl->synced_data.lock);
impl->synced_data.read_dest = dest;
impl->synced_data.read_future = aws_future_bool_acquire(read_future);
AWS_FATAL_ASSERT(aws_condition_variable_notify_all(&impl->synced_data.cvar) == AWS_OP_SUCCESS);
aws_mutex_unlock(&impl->synced_data.lock);
/* END CRITICAL SECTION */
} else {
/* acquire additional refcount on future, since we call release once it's complete */
aws_future_bool_acquire(read_future);
s_async_input_stream_tester_do_actual_read(impl, dest, read_future);
}
return read_future;
}
static inline void s_async_input_stream_tester_do_actual_destroy(struct aws_async_input_stream_tester *impl) {
if (impl->options.completion_strategy != AWS_ASYNC_READ_COMPLETES_IMMEDIATELY) {
aws_condition_variable_clean_up(&impl->synced_data.cvar);
aws_mutex_clean_up(&impl->synced_data.lock);
}
aws_input_stream_release(impl->source_stream);
aws_mem_release(impl->base.alloc, impl);
}
/* refcount has reached zero */
static inline void s_async_input_stream_tester_destroy(struct aws_async_input_stream *async_stream) {
struct aws_async_input_stream_tester *impl = (struct aws_async_input_stream_tester *)async_stream->impl;
if (impl->options.completion_strategy == AWS_ASYNC_READ_COMPLETES_IMMEDIATELY) {
s_async_input_stream_tester_do_actual_destroy(impl);
} else {
/* signal thread to finish cleaning things up */
/* BEGIN CRITICAL SECTION */
aws_mutex_lock(&impl->synced_data.lock);
impl->synced_data.do_shutdown = true;
AWS_FATAL_ASSERT(aws_condition_variable_notify_all(&impl->synced_data.cvar) == AWS_OP_SUCCESS);
aws_mutex_unlock(&impl->synced_data.lock);
/* END CRITICAL SECTION */
}
}
static inline bool s_async_input_stream_tester_thread_pred(void *arg) {
struct aws_async_input_stream_tester *impl = (struct aws_async_input_stream_tester *)arg;
return impl->synced_data.do_shutdown || (impl->synced_data.read_dest != NULL);
}
static inline void s_async_input_stream_tester_thread(void *arg) {
struct aws_async_input_stream_tester *impl = (struct aws_async_input_stream_tester *)arg;
bool do_shutdown = false;
struct aws_byte_buf *read_dest = NULL;
struct aws_future_bool *read_future = NULL;
while (!do_shutdown) {
/* BEGIN CRITICAL SECTION */
aws_mutex_lock(&impl->synced_data.lock);
AWS_FATAL_ASSERT(
aws_condition_variable_wait_pred(
&impl->synced_data.cvar, &impl->synced_data.lock, s_async_input_stream_tester_thread_pred, impl) ==
AWS_OP_SUCCESS);
/* acquire work */
do_shutdown = impl->synced_data.do_shutdown;
read_dest = impl->synced_data.read_dest;
impl->synced_data.read_dest = NULL;
read_future = impl->synced_data.read_future;
impl->synced_data.read_future = NULL;
aws_mutex_unlock(&impl->synced_data.lock);
/* END CRITICAL SECTION */
if (read_dest != NULL) {
s_async_input_stream_tester_do_actual_read(impl, read_dest, read_future);
}
}
/* thread has shut down, finish destruction */
s_async_input_stream_tester_do_actual_destroy(impl);
}
static inline uint64_t aws_async_input_stream_tester_total_bytes_read(
const struct aws_async_input_stream *async_stream) {
const struct aws_async_input_stream_tester *async_impl =
(const struct aws_async_input_stream_tester *)async_stream->impl;
const struct aws_input_stream_tester *synchronous_impl =
(const struct aws_input_stream_tester *)async_impl->source_stream->impl;
return synchronous_impl->total_bytes_read;
}
static struct aws_async_input_stream_vtable s_async_input_stream_tester_vtable = {
.destroy = s_async_input_stream_tester_destroy,
.read = s_async_input_stream_tester_read,
};
static inline struct aws_async_input_stream *aws_async_input_stream_new_tester(
struct aws_allocator *alloc,
const struct aws_async_input_stream_tester_options *options) {
struct aws_async_input_stream_tester *impl =
(struct aws_async_input_stream_tester *)aws_mem_calloc(alloc, 1, sizeof(struct aws_async_input_stream_tester));
aws_async_input_stream_init_base(&impl->base, alloc, &s_async_input_stream_tester_vtable, impl);
impl->options = *options;
aws_atomic_init_int(&impl->num_outstanding_reads, 0);
impl->source_stream = aws_input_stream_new_tester(alloc, &options->base);
AWS_FATAL_ASSERT(impl->source_stream);
if (options->completion_strategy != AWS_ASYNC_READ_COMPLETES_IMMEDIATELY) {
aws_mutex_init(&impl->synced_data.lock);
aws_condition_variable_init(&impl->synced_data.cvar);
AWS_FATAL_ASSERT(aws_thread_init(&impl->thread, alloc) == AWS_OP_SUCCESS);
struct aws_thread_options thread_options = *aws_default_thread_options();
thread_options.name = aws_byte_cursor_from_c_str("AsyncStream");
thread_options.join_strategy = AWS_TJS_MANAGED;
AWS_FATAL_ASSERT(
aws_thread_launch(&impl->thread, s_async_input_stream_tester_thread, impl, &thread_options) ==
AWS_OP_SUCCESS);
}
return &impl->base;
}
#endif /* AWS_TESTING_ASYNC_STREAM_TESTER_H */

View File

@@ -0,0 +1,572 @@
#ifndef AWS_TESTING_AWS_TEST_HARNESS_H
#define AWS_TESTING_AWS_TEST_HARNESS_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/common/common.h>
#include <aws/common/error.h>
#include <aws/common/logging.h>
#include <aws/common/mutex.h>
#include <aws/common/system_info.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/**
* The return code for skipped tests. Use the return code if the test should be skipped.
*/
#define AWS_OP_SKIP (-2)
#ifndef AWS_UNSTABLE_TESTING_API
# error The AWS Test Fixture is designed only for use by AWS owned libraries for the AWS C99 SDK. You are welcome to use it, \
but you should be aware we make no promises on the stability of this API. To enable use of the aws test fixtures, set \
the AWS_UNSTABLE_TESTING_API compiler flag
#endif
#ifndef AWS_TESTING_REPORT_FD
# define AWS_TESTING_REPORT_FD stderr
#endif
#ifdef _MSC_VER
# pragma warning(disable : 4221) /* aggregate initializer using local variable addresses */
# pragma warning(disable : 4204) /* non-constant aggregate initializer */
#endif
#if defined(__clang__)
# pragma clang diagnostic ignored "-Wgnu-zero-variadic-macro-arguments"
#endif
/** Prints a message to AWS_TESTING_REPORT_FD using printf format that appends the function, file and line number.
* If format is null, returns 0 without printing anything; otherwise returns 1.
* If function or file are null, the function, file and line number are not appended.
*/
static int s_cunit_failure_message0(
const char *prefix,
const char *function,
const char *file,
int line,
const char *format,
...) {
if (!format) {
return 0;
}
fprintf(AWS_TESTING_REPORT_FD, "%s", prefix);
va_list ap;
va_start(ap, format);
vfprintf(AWS_TESTING_REPORT_FD, format, ap);
va_end(ap);
if (function && file) {
fprintf(AWS_TESTING_REPORT_FD, " [%s(): %s:%d]\n", function, file, line);
} else {
fprintf(AWS_TESTING_REPORT_FD, "\n");
}
return 1;
}
#define FAIL_PREFIX "***FAILURE*** "
#define CUNIT_FAILURE_MESSAGE(func, file, line, format, ...) \
s_cunit_failure_message0(FAIL_PREFIX, func, file, line, format, #__VA_ARGS__)
#define SUCCESS (0)
#define FAILURE (-1)
/* The exit code returned to ctest to indicate the test is skipped. Refer to cmake doc:
* https://cmake.org/cmake/help/latest/prop_test/SKIP_RETURN_CODE.html
* The value has no special meaning, it's just an arbitrary exit code reducing the chance of clashing with exit codes
* that may be returned from various tools (e.g. sanitizer). */
#define SKIP (103)
#define RETURN_SUCCESS(format, ...) \
do { \
printf(format, ##__VA_ARGS__); \
printf("\n"); \
return SUCCESS; \
} while (0)
#define PRINT_FAIL_INTERNAL(...) CUNIT_FAILURE_MESSAGE(__func__, __FILE__, __LINE__, ##__VA_ARGS__, (const char *)NULL)
#define PRINT_FAIL_INTERNAL0(...) \
s_cunit_failure_message0(FAIL_PREFIX, __func__, __FILE__, __LINE__, ##__VA_ARGS__, (const char *)NULL)
#define PRINT_FAIL_WITHOUT_LOCATION(...) \
s_cunit_failure_message0(FAIL_PREFIX, NULL, NULL, __LINE__, ##__VA_ARGS__, (const char *)NULL)
#define POSTFAIL_INTERNAL() \
do { \
return FAILURE; \
} while (0)
#define FAIL(...) \
do { \
PRINT_FAIL_INTERNAL0(__VA_ARGS__); \
POSTFAIL_INTERNAL(); \
} while (0)
#define ASSERT_TRUE(condition, ...) \
do { \
if (!(condition)) { \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("Expected condition to be true: " #condition); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_FALSE(condition, ...) \
do { \
if ((condition)) { \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("Expected condition to be false: " #condition); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_SUCCESS(condition, ...) \
do { \
int assert_rv = (condition); \
if (assert_rv != AWS_OP_SUCCESS) { \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0( \
"Expected success at %s; got return value %d with last error %d\n", \
#condition, \
assert_rv, \
aws_last_error()); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_FAILS(condition, ...) \
do { \
int assert_rv = (condition); \
if (assert_rv != AWS_OP_ERR) { \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0( \
"Expected failure at %s; got return value %d with last error %d\n", \
#condition, \
assert_rv, \
aws_last_error()); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_ERROR(error, condition, ...) \
do { \
int assert_rv = (condition); \
int assert_err = aws_last_error(); \
int assert_err_expect = (error); \
if (assert_rv != AWS_OP_ERR) { \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%sExpected error but no error occurred; rv=%d, aws_last_error=%d (expected %d): ", \
FAIL_PREFIX, \
assert_rv, \
assert_err, \
assert_err_expect); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s", #condition); \
} \
POSTFAIL_INTERNAL(); \
} \
if (assert_err != assert_err_expect) { \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%sIncorrect error code; aws_last_error=%d (expected %d): ", \
FAIL_PREFIX, \
assert_err, \
assert_err_expect); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s", #condition); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_NULL(ptr, ...) \
do { \
/* XXX: Some tests use ASSERT_NULL on ints... */ \
void *assert_p = (void *)(uintptr_t)(ptr); \
if (assert_p) { \
fprintf(AWS_TESTING_REPORT_FD, "%sExpected null but got %p: ", FAIL_PREFIX, assert_p); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s", #ptr); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_NOT_NULL(ptr, ...) \
do { \
/* XXX: Some tests use ASSERT_NULL on ints... */ \
void *assert_p = (void *)(uintptr_t)(ptr); \
if (!assert_p) { \
fprintf(AWS_TESTING_REPORT_FD, "%sExpected non-null but got null: ", FAIL_PREFIX); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s", #ptr); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_TYP_EQUALS(type, formatarg, expected, got, ...) \
do { \
type assert_expected = (expected); \
type assert_actual = (got); \
if (assert_expected != assert_actual) { \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%s" formatarg " != " formatarg ": ", \
FAIL_PREFIX, \
assert_expected, \
assert_actual); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s != %s", #expected, #got); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#ifdef _MSC_VER
# define ASSERT_INT_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(intmax_t, "%lld", expected, got, __VA_ARGS__)
# define ASSERT_UINT_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(uintmax_t, "%llu", expected, got, __VA_ARGS__)
#else
/* For comparing any signed integer types */
# define ASSERT_INT_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(intmax_t, "%jd", expected, got, __VA_ARGS__)
/* For comparing any unsigned integer types */
# define ASSERT_UINT_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(uintmax_t, "%ju", expected, got, __VA_ARGS__)
#endif
#define ASSERT_PTR_EQUALS(expected, got, ...) \
do { \
void *assert_expected = (void *)(uintptr_t)(expected); \
void *assert_actual = (void *)(uintptr_t)(got); \
if (assert_expected != assert_actual) { \
fprintf(AWS_TESTING_REPORT_FD, "%s%p != %p: ", FAIL_PREFIX, assert_expected, assert_actual); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("%s != %s", #expected, #got); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
/* note that uint8_t is promoted to unsigned int in varargs, so %02x is an acceptable format string */
#define ASSERT_BYTE_HEX_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(uint8_t, "%02X", expected, got, __VA_ARGS__)
#define ASSERT_HEX_EQUALS(expected, got, ...) ASSERT_TYP_EQUALS(unsigned long long, "%llX", expected, got, __VA_ARGS__)
#define ASSERT_STR_EQUALS(expected, got, ...) \
do { \
const char *assert_expected = (expected); \
const char *assert_got = (got); \
ASSERT_NOT_NULL(assert_expected); \
ASSERT_NOT_NULL(assert_got); \
if (strcmp(assert_expected, assert_got) != 0) { \
fprintf( \
AWS_TESTING_REPORT_FD, "%sExpected: \"%s\"; got: \"%s\": ", FAIL_PREFIX, assert_expected, assert_got); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("ASSERT_STR_EQUALS(%s, %s)", #expected, #got); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_BIN_ARRAYS_EQUALS(expected, expected_size, got, got_size, ...) \
do { \
const uint8_t *assert_ex_p = (const uint8_t *)(expected); \
size_t assert_ex_s = (expected_size); \
const uint8_t *assert_got_p = (const uint8_t *)(got); \
size_t assert_got_s = (got_size); \
if (assert_ex_s == 0 && assert_got_s == 0) { \
break; \
} \
if (assert_ex_s != assert_got_s) { \
fprintf(AWS_TESTING_REPORT_FD, "%sSize mismatch: %zu != %zu: ", FAIL_PREFIX, assert_ex_s, assert_got_s); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0( \
"ASSERT_BIN_ARRAYS_EQUALS(%s, %s, %s, %s)", #expected, #expected_size, #got, #got_size); \
} \
POSTFAIL_INTERNAL(); \
} \
if (memcmp(assert_ex_p, assert_got_p, assert_got_s) != 0) { \
if (assert_got_s <= 1024) { \
for (size_t assert_i = 0; assert_i < assert_ex_s; ++assert_i) { \
if (assert_ex_p[assert_i] != assert_got_p[assert_i]) { \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%sMismatch at byte[%zu]: 0x%02X != 0x%02X: ", \
FAIL_PREFIX, \
assert_i, \
assert_ex_p[assert_i], \
assert_got_p[assert_i]); \
break; \
} \
} \
} else { \
fprintf(AWS_TESTING_REPORT_FD, "%sData mismatch: ", FAIL_PREFIX); \
} \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0( \
"ASSERT_BIN_ARRAYS_EQUALS(%s, %s, %s, %s)", #expected, #expected_size, #got, #got_size); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_CURSOR_VALUE_CSTRING_EQUALS(cursor, cstring, ...) \
do { \
const uint8_t *assert_ex_p = (const uint8_t *)((cursor).ptr); \
size_t assert_ex_s = (cursor).len; \
const uint8_t *assert_got_p = (const uint8_t *)cstring; \
size_t assert_got_s = strlen(cstring); \
if (assert_ex_s == 0 && assert_got_s == 0) { \
break; \
} \
if (assert_ex_s != assert_got_s) { \
fprintf(AWS_TESTING_REPORT_FD, "%sSize mismatch: %zu != %zu: \n", FAIL_PREFIX, assert_ex_s, assert_got_s); \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%sGot: \"" PRInSTR "\"; Expected: \"%s\" \n", \
FAIL_PREFIX, \
AWS_BYTE_CURSOR_PRI(cursor), \
cstring); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("ASSERT_CURSOR_VALUE_STRING_EQUALS(%s, %s)", #cursor, #cstring); \
} \
POSTFAIL_INTERNAL(); \
} \
if (memcmp(assert_ex_p, assert_got_p, assert_got_s) != 0) { \
fprintf( \
AWS_TESTING_REPORT_FD, \
"%sData mismatch; Got: \"" PRInSTR "\"; Expected: \"%s\" \n", \
FAIL_PREFIX, \
AWS_BYTE_CURSOR_PRI(cursor), \
cstring); \
if (!PRINT_FAIL_INTERNAL0(__VA_ARGS__)) { \
PRINT_FAIL_INTERNAL0("ASSERT_CURSOR_VALUE_STRING_EQUALS(%s, %s)", #cursor, #cstring); \
} \
POSTFAIL_INTERNAL(); \
} \
} while (0)
#define ASSERT_CURSOR_VALUE_STRING_EQUALS(cursor, string, ...) \
ASSERT_CURSOR_VALUE_CSTRING_EQUALS(cursor, aws_string_c_str(string));
typedef int(aws_test_before_fn)(struct aws_allocator *allocator, void *ctx);
typedef int(aws_test_run_fn)(struct aws_allocator *allocator, void *ctx);
typedef int(aws_test_after_fn)(struct aws_allocator *allocator, int setup_result, void *ctx);
struct aws_test_harness {
aws_test_before_fn *on_before;
aws_test_run_fn *run;
aws_test_after_fn *on_after;
void *ctx;
const char *test_name;
int suppress_memcheck;
};
#if defined(_WIN32)
# include <windows.h>
static LONG WINAPI s_test_print_stack_trace(struct _EXCEPTION_POINTERS *exception_pointers) {
# if !defined(AWS_HEADER_CHECKER)
aws_backtrace_print(stderr, exception_pointers);
# endif
return EXCEPTION_EXECUTE_HANDLER;
}
#elif defined(AWS_HAVE_EXECINFO)
# include <signal.h>
static void s_print_stack_trace(int sig, siginfo_t *sig_info, void *user_data) {
(void)sig;
(void)sig_info;
(void)user_data;
# if !defined(AWS_HEADER_CHECKER)
aws_backtrace_print(stderr, sig_info);
# endif
exit(-1);
}
#endif
static inline int s_aws_run_test_case(struct aws_test_harness *harness) {
AWS_ASSERT(harness->run);
/*
* MSVC compiler has a weird interactive pop-up in debug whenever 'abort()' is called, which can be triggered
* by hitting any aws_assert or aws_pre_condition, causing the CI to hang. So disable the pop-up in tests.
*/
#ifdef _MSC_VER
_set_abort_behavior(0, _WRITE_ABORT_MSG | _CALL_REPORTFAULT);
#endif
#if defined(_WIN32)
SetUnhandledExceptionFilter(s_test_print_stack_trace);
/* Set working directory to path to this exe */
char cwd[512];
DWORD len = GetModuleFileNameA(NULL, cwd, sizeof(cwd));
DWORD idx = len - 1;
while (idx && cwd[idx] != '\\') {
idx--;
}
cwd[idx] = 0;
SetCurrentDirectory(cwd);
#elif defined(AWS_HAVE_EXECINFO)
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_NODEFER;
sa.sa_sigaction = s_print_stack_trace;
sigaction(SIGSEGV, &sa, NULL);
#endif
/* track allocations and report leaks in tests, unless suppressed */
struct aws_allocator *allocator = NULL;
if (harness->suppress_memcheck) {
allocator = aws_default_allocator();
} else {
allocator = aws_mem_tracer_new(aws_default_allocator(), NULL, AWS_MEMTRACE_STACKS, 8);
}
/* wire up a logger to stderr by default, may be replaced by some tests */
struct aws_logger err_logger;
struct aws_logger_standard_options options;
options.file = AWS_TESTING_REPORT_FD;
options.level = AWS_LL_TRACE;
options.filename = NULL;
aws_logger_init_standard(&err_logger, aws_default_allocator(), &options);
aws_logger_set(&err_logger);
int test_res = AWS_OP_ERR;
int setup_res = AWS_OP_SUCCESS;
if (harness->on_before) {
setup_res = harness->on_before(allocator, harness->ctx);
}
if (!setup_res) {
test_res = harness->run(allocator, harness->ctx);
}
if (harness->on_after) {
test_res |= harness->on_after(allocator, setup_res, harness->ctx);
}
if (test_res != AWS_OP_SUCCESS) {
goto fail;
}
if (!harness->suppress_memcheck) {
/* Reset the logger, as test can set their own logger and clean it up,
* but aws_mem_tracer_dump() needs a valid logger to be active */
aws_logger_set(&err_logger);
const size_t leaked_allocations = aws_mem_tracer_count(allocator);
const size_t leaked_bytes = aws_mem_tracer_bytes(allocator);
if (leaked_bytes) {
aws_mem_tracer_dump(allocator);
PRINT_FAIL_WITHOUT_LOCATION(
"Test leaked memory: %zu bytes %zu allocations", leaked_bytes, leaked_allocations);
goto fail;
}
aws_mem_tracer_destroy(allocator);
}
aws_logger_set(NULL);
aws_logger_clean_up(&err_logger);
RETURN_SUCCESS("%s [ \033[32mOK\033[0m ]", harness->test_name);
fail:
if (test_res == AWS_OP_SKIP) {
fprintf(AWS_TESTING_REPORT_FD, "%s [ \033[32mSKIP\033[0m ]\n", harness->test_name);
} else {
PRINT_FAIL_WITHOUT_LOCATION("%s [ \033[31mFAILED\033[0m ]", harness->test_name);
}
/* Use _Exit() to terminate without cleaning up resources.
* This prevents LeakSanitizer spam (yes, we know failing tests don't bother cleaning up).
* It also prevents errors where threads that haven't cleaned are still using the logger declared in this fn. */
fflush(AWS_TESTING_REPORT_FD);
fflush(stdout);
fflush(stderr);
_Exit(test_res == AWS_OP_SKIP ? SKIP : FAILURE);
}
/* Enables terminal escape sequences for text coloring on Windows. */
/* https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences */
#ifdef _WIN32
# include <windows.h>
# ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING
# define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x0004
# endif
static inline int enable_vt_mode(void) {
HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE);
if (hOut == INVALID_HANDLE_VALUE) {
return AWS_OP_ERR;
}
DWORD dwMode = 0;
if (!GetConsoleMode(hOut, &dwMode)) {
return AWS_OP_ERR;
}
dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
if (!SetConsoleMode(hOut, dwMode)) {
return AWS_OP_ERR;
}
return AWS_OP_SUCCESS;
}
#else
static inline int enable_vt_mode(void) {
return AWS_OP_ERR;
}
#endif
#define AWS_TEST_CASE_SUPRESSION(name, fn, s) \
static int fn(struct aws_allocator *allocator, void *ctx); \
static struct aws_test_harness name##_test = { \
NULL, \
fn, \
NULL, \
NULL, \
#name, \
s, \
}; \
int name(int argc, char *argv[]) { \
(void)argc, (void)argv; \
return s_aws_run_test_case(&name##_test); \
}
#define AWS_TEST_CASE_FIXTURE_SUPPRESSION(name, b, fn, af, c, s) \
static int b(struct aws_allocator *allocator, void *ctx); \
static int fn(struct aws_allocator *allocator, void *ctx); \
static int af(struct aws_allocator *allocator, int setup_result, void *ctx); \
static struct aws_test_harness name##_test = { \
b, \
fn, \
af, \
c, \
#name, \
s, \
}; \
int name(int argc, char *argv[]) { \
(void)argc; \
(void)argv; \
return s_aws_run_test_case(&name##_test); \
}
#define AWS_TEST_CASE(name, fn) AWS_TEST_CASE_SUPRESSION(name, fn, 0)
#define AWS_TEST_CASE_FIXTURE(name, b, fn, af, c) AWS_TEST_CASE_FIXTURE_SUPPRESSION(name, b, fn, af, c, 0)
#endif /* AWS_TESTING_AWS_TEST_HARNESS_H */

View File

@@ -0,0 +1,637 @@
#ifndef AWS_TESTING_IO_TESTING_CHANNEL_H
#define AWS_TESTING_IO_TESTING_CHANNEL_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/common/clock.h>
#include <aws/common/task_scheduler.h>
#include <aws/io/channel.h>
#include <aws/io/event_loop.h>
#include <aws/io/logging.h>
#include <aws/io/statistics.h>
#include <aws/testing/aws_test_harness.h>
struct testing_loop {
struct aws_task_scheduler scheduler;
bool mock_on_callers_thread;
};
static int s_testing_loop_run(struct aws_event_loop *event_loop) {
(void)event_loop;
return AWS_OP_SUCCESS;
}
static int s_testing_loop_stop(struct aws_event_loop *event_loop) {
(void)event_loop;
return AWS_OP_SUCCESS;
}
static int s_testing_loop_wait_for_stop_completion(struct aws_event_loop *event_loop) {
(void)event_loop;
return AWS_OP_SUCCESS;
}
static void s_testing_loop_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
struct testing_loop *testing_loop = event_loop->impl_data;
aws_task_scheduler_schedule_now(&testing_loop->scheduler, task);
}
static void s_testing_loop_schedule_task_future(
struct aws_event_loop *event_loop,
struct aws_task *task,
uint64_t run_at_nanos) {
struct testing_loop *testing_loop = event_loop->impl_data;
aws_task_scheduler_schedule_future(&testing_loop->scheduler, task, run_at_nanos);
}
static void s_testing_loop_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
struct testing_loop *testing_loop = event_loop->impl_data;
aws_task_scheduler_cancel_task(&testing_loop->scheduler, task);
}
static bool s_testing_loop_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct testing_loop *testing_loop = event_loop->impl_data;
return testing_loop->mock_on_callers_thread;
}
static void s_testing_loop_destroy(struct aws_event_loop *event_loop) {
struct testing_loop *testing_loop = event_loop->impl_data;
aws_task_scheduler_clean_up(&testing_loop->scheduler);
aws_mem_release(event_loop->alloc, testing_loop);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);
}
static struct aws_event_loop_vtable s_testing_loop_vtable = {
.destroy = s_testing_loop_destroy,
.is_on_callers_thread = s_testing_loop_is_on_callers_thread,
.run = s_testing_loop_run,
.schedule_task_now = s_testing_loop_schedule_task_now,
.schedule_task_future = s_testing_loop_schedule_task_future,
.cancel_task = s_testing_loop_cancel_task,
.stop = s_testing_loop_stop,
.wait_for_stop_completion = s_testing_loop_wait_for_stop_completion,
};
static struct aws_event_loop *s_testing_loop_new(struct aws_allocator *allocator, aws_io_clock_fn clock) {
struct aws_event_loop *event_loop = aws_mem_acquire(allocator, sizeof(struct aws_event_loop));
aws_event_loop_init_base(event_loop, allocator, clock);
struct testing_loop *testing_loop = aws_mem_calloc(allocator, 1, sizeof(struct testing_loop));
aws_task_scheduler_init(&testing_loop->scheduler, allocator);
testing_loop->mock_on_callers_thread = true;
event_loop->impl_data = testing_loop;
event_loop->vtable = &s_testing_loop_vtable;
return event_loop;
}
typedef void(testing_channel_handler_on_shutdown_fn)(
enum aws_channel_direction dir,
int error_code,
bool free_scarce_resources_immediately,
void *user_data);
struct testing_channel_handler {
struct aws_linked_list messages;
size_t latest_window_update;
size_t initial_window;
bool complete_write_immediately;
int complete_write_error_code;
testing_channel_handler_on_shutdown_fn *on_shutdown;
void *on_shutdown_user_data;
struct aws_crt_statistics_socket stats;
};
static int s_testing_channel_handler_process_read_message(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_io_message *message) {
(void)handler;
(void)slot;
(void)message;
struct testing_channel_handler *testing_handler = handler->impl;
aws_linked_list_push_back(&testing_handler->messages, &message->queueing_handle);
return AWS_OP_SUCCESS;
}
static int s_testing_channel_handler_process_write_message(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_io_message *message) {
(void)slot;
struct testing_channel_handler *testing_handler = handler->impl;
aws_linked_list_push_back(&testing_handler->messages, &message->queueing_handle);
/* Invoke completion callback if this is the left-most handler */
if (message->on_completion && !slot->adj_left && testing_handler->complete_write_immediately) {
message->on_completion(slot->channel, message, testing_handler->complete_write_error_code, message->user_data);
message->on_completion = NULL;
}
return AWS_OP_SUCCESS;
}
static int s_testing_channel_handler_increment_read_window(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
size_t size) {
(void)slot;
struct testing_channel_handler *testing_handler = handler->impl;
testing_handler->latest_window_update = size;
return AWS_OP_SUCCESS;
}
static int s_testing_channel_handler_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
enum aws_channel_direction dir,
int error_code,
bool free_scarce_resources_immediately) {
struct testing_channel_handler *testing_handler = handler->impl;
/* If user has registered a callback, invoke it */
if (testing_handler->on_shutdown) {
testing_handler->on_shutdown(
dir, error_code, free_scarce_resources_immediately, testing_handler->on_shutdown_user_data);
}
if (dir == AWS_CHANNEL_DIR_WRITE) {
if (!slot->adj_left) {
/* Invoke the on_completion callbacks for any queued messages */
struct aws_linked_list_node *node = aws_linked_list_begin(&testing_handler->messages);
while (node != aws_linked_list_end(&testing_handler->messages)) {
struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
if (msg->on_completion) {
msg->on_completion(slot->channel, msg, AWS_IO_SOCKET_CLOSED, msg->user_data);
msg->on_completion = NULL;
}
node = aws_linked_list_next(node);
}
}
}
return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
}
static size_t s_testing_channel_handler_initial_window_size(struct aws_channel_handler *handler) {
struct testing_channel_handler *testing_handler = handler->impl;
return testing_handler->initial_window;
}
static size_t s_testing_channel_handler_message_overhead(struct aws_channel_handler *handler) {
(void)handler;
return 0;
}
static void s_testing_channel_handler_destroy(struct aws_channel_handler *handler) {
struct testing_channel_handler *testing_handler = handler->impl;
while (!aws_linked_list_empty(&testing_handler->messages)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&testing_handler->messages);
struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
aws_mem_release(msg->allocator, msg);
}
aws_mem_release(handler->alloc, testing_handler);
aws_mem_release(handler->alloc, handler);
}
static void s_testing_channel_handler_reset_statistics(struct aws_channel_handler *handler) {
struct testing_channel_handler *testing_handler = handler->impl;
aws_crt_statistics_socket_reset(&testing_handler->stats);
}
static void s_testing_channel_handler_gather_statistics(
struct aws_channel_handler *handler,
struct aws_array_list *stats) {
struct testing_channel_handler *testing_handler = handler->impl;
void *stats_base = &testing_handler->stats;
aws_array_list_push_back(stats, &stats_base);
}
static struct aws_channel_handler_vtable s_testing_channel_handler_vtable = {
.process_read_message = s_testing_channel_handler_process_read_message,
.process_write_message = s_testing_channel_handler_process_write_message,
.increment_read_window = s_testing_channel_handler_increment_read_window,
.shutdown = s_testing_channel_handler_shutdown,
.initial_window_size = s_testing_channel_handler_initial_window_size,
.message_overhead = s_testing_channel_handler_message_overhead,
.destroy = s_testing_channel_handler_destroy,
.gather_statistics = s_testing_channel_handler_gather_statistics,
.reset_statistics = s_testing_channel_handler_reset_statistics,
};
static struct aws_channel_handler *s_new_testing_channel_handler(
struct aws_allocator *allocator,
size_t initial_window) {
struct aws_channel_handler *handler = aws_mem_calloc(allocator, 1, sizeof(struct aws_channel_handler));
struct testing_channel_handler *testing_handler =
aws_mem_calloc(allocator, 1, sizeof(struct testing_channel_handler));
aws_linked_list_init(&testing_handler->messages);
testing_handler->initial_window = initial_window;
testing_handler->latest_window_update = 0;
testing_handler->complete_write_immediately = true;
testing_handler->complete_write_error_code = AWS_ERROR_SUCCESS;
handler->impl = testing_handler;
handler->vtable = &s_testing_channel_handler_vtable;
handler->alloc = allocator;
return handler;
}
struct testing_channel {
struct aws_event_loop *loop;
struct testing_loop *loop_impl;
struct aws_channel *channel;
struct testing_channel_handler *left_handler_impl;
struct testing_channel_handler *right_handler_impl;
struct aws_channel_slot *left_handler_slot;
struct aws_channel_slot *right_handler_slot;
void (*channel_shutdown)(int error_code, void *user_data);
void *channel_shutdown_user_data;
bool channel_setup_completed;
bool channel_shutdown_completed;
int channel_shutdown_error_code;
};
static void s_testing_channel_on_setup_completed(struct aws_channel *channel, int error_code, void *user_data) {
(void)channel;
(void)error_code;
struct testing_channel *testing = user_data;
testing->channel_setup_completed = true;
}
static void s_testing_channel_on_shutdown_completed(struct aws_channel *channel, int error_code, void *user_data) {
(void)channel;
(void)error_code;
struct testing_channel *testing = user_data;
testing->channel_shutdown_completed = true;
testing->channel_shutdown_error_code = error_code;
if (testing->channel_shutdown) {
testing->channel_shutdown(error_code, testing->channel_shutdown_user_data);
}
}
/** API for testing, use this for testing purely your channel handlers and nothing else. Because of that, the s_
* convention isn't used on the functions (since they're intended for you to call). */
/** when you want to test the read path of your handler, call this with the message you want it to read. */
static inline int testing_channel_push_read_message(struct testing_channel *testing, struct aws_io_message *message) {
return aws_channel_slot_send_message(testing->left_handler_slot, message, AWS_CHANNEL_DIR_READ);
}
/** when you want to test the write path of your handler, call this with the message you want it to write.
* A downstream handler must have been installed */
static inline int testing_channel_push_write_message(struct testing_channel *testing, struct aws_io_message *message) {
ASSERT_NOT_NULL(testing->right_handler_slot);
return aws_channel_slot_send_message(testing->right_handler_slot, message, AWS_CHANNEL_DIR_WRITE);
}
/** when you want to test the write output of your handler, call this, get the queue and iterate the messages. */
static inline struct aws_linked_list *testing_channel_get_written_message_queue(struct testing_channel *testing) {
return &testing->left_handler_impl->messages;
}
/** Set whether written messages have their on_complete callbacks invoked immediately.
* The on_complete callback will be cleared after it is invoked. */
static inline void testing_channel_complete_written_messages_immediately(
struct testing_channel *testing,
bool complete_immediately,
int complete_error_code) {
testing->left_handler_impl->complete_write_immediately = complete_immediately;
testing->left_handler_impl->complete_write_error_code = complete_error_code;
}
/** when you want to test the read output of your handler, call this, get the queue and iterate the messages.
* A downstream handler must have been installed */
static inline struct aws_linked_list *testing_channel_get_read_message_queue(struct testing_channel *testing) {
AWS_ASSERT(testing->right_handler_impl);
return &testing->right_handler_impl->messages;
}
/** When you want to see what the latest window update issues from your channel handler was, call this. */
static inline size_t testing_channel_last_window_update(struct testing_channel *testing) {
return testing->left_handler_impl->latest_window_update;
}
/** When you want the downstream handler to issue a window update */
static inline int testing_channel_increment_read_window(struct testing_channel *testing, size_t size) {
ASSERT_NOT_NULL(testing->right_handler_slot);
return aws_channel_slot_increment_read_window(testing->right_handler_slot, size);
}
/** Executes all currently scheduled tasks whose time has come.
* Use testing_channel_drain_queued_tasks() to repeatedly run tasks until only future-tasks remain.
*/
static inline void testing_channel_run_currently_queued_tasks(struct testing_channel *testing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(testing->channel));
uint64_t now = 0;
aws_event_loop_current_clock_time(testing->loop, &now);
aws_task_scheduler_run_all(&testing->loop_impl->scheduler, now);
}
/** Repeatedly executes scheduled tasks until only those in the future remain.
* This covers the common case where there's a chain reaction of now-tasks scheduling further now-tasks.
*/
static inline void testing_channel_drain_queued_tasks(struct testing_channel *testing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(testing->channel));
uint64_t now = 0;
uint64_t next_task_time = 0;
size_t count = 0;
while (true) {
aws_event_loop_current_clock_time(testing->loop, &now);
if (aws_task_scheduler_has_tasks(&testing->loop_impl->scheduler, &next_task_time) && (next_task_time <= now)) {
aws_task_scheduler_run_all(&testing->loop_impl->scheduler, now);
} else {
break;
}
/* NOTE: This will loop infinitely if there's a task the perpetually re-schedules another task.
* Consider capping the number of loops if we want to support that behavior. */
if ((++count % 1000) == 0) {
AWS_LOGF_WARN(
AWS_LS_IO_CHANNEL,
"id=%p: testing_channel_drain_queued_tasks() has looped %zu times.",
(void *)testing->channel,
count);
}
}
}
/** When you want to force the "not on channel thread path" for your handler, set 'on_users_thread' to false.
* when you want to undo that, set it back to true. If you set it to false, you'll need to call
* 'testing_channel_execute_queued_tasks()' to invoke the tasks that ended up being scheduled. */
static inline void testing_channel_set_is_on_users_thread(struct testing_channel *testing, bool on_users_thread) {
testing->loop_impl->mock_on_callers_thread = on_users_thread;
}
struct aws_testing_channel_options {
aws_io_clock_fn *clock_fn;
};
static inline int testing_channel_init(
struct testing_channel *testing,
struct aws_allocator *allocator,
struct aws_testing_channel_options *options) {
AWS_ZERO_STRUCT(*testing);
testing->loop = s_testing_loop_new(allocator, options->clock_fn);
testing->loop_impl = testing->loop->impl_data;
struct aws_channel_options args = {
.on_setup_completed = s_testing_channel_on_setup_completed,
.on_shutdown_completed = s_testing_channel_on_shutdown_completed,
.setup_user_data = testing,
.shutdown_user_data = testing,
.event_loop = testing->loop,
.enable_read_back_pressure = true,
};
testing->channel = aws_channel_new(allocator, &args);
/* Wait for channel to finish setup */
testing_channel_drain_queued_tasks(testing);
ASSERT_TRUE(testing->channel_setup_completed);
testing->left_handler_slot = aws_channel_slot_new(testing->channel);
struct aws_channel_handler *handler = s_new_testing_channel_handler(allocator, 16 * 1024);
testing->left_handler_impl = handler->impl;
ASSERT_SUCCESS(aws_channel_slot_set_handler(testing->left_handler_slot, handler));
return AWS_OP_SUCCESS;
}
static inline int testing_channel_clean_up(struct testing_channel *testing) {
aws_channel_shutdown(testing->channel, AWS_ERROR_SUCCESS);
/* Wait for channel to finish shutdown */
testing_channel_drain_queued_tasks(testing);
ASSERT_TRUE(testing->channel_shutdown_completed);
aws_channel_destroy(testing->channel);
/* event_loop can't be destroyed from its own thread */
testing_channel_set_is_on_users_thread(testing, false);
aws_event_loop_destroy(testing->loop);
return AWS_OP_SUCCESS;
}
/** When you want to test your handler with a downstream handler installed to the right. */
static inline int testing_channel_install_downstream_handler(struct testing_channel *testing, size_t initial_window) {
ASSERT_NULL(testing->right_handler_slot);
testing->right_handler_slot = aws_channel_slot_new(testing->channel);
ASSERT_NOT_NULL(testing->right_handler_slot);
ASSERT_SUCCESS(aws_channel_slot_insert_end(testing->channel, testing->right_handler_slot));
struct aws_channel_handler *handler =
s_new_testing_channel_handler(testing->left_handler_slot->alloc, initial_window);
ASSERT_NOT_NULL(handler);
testing->right_handler_impl = handler->impl;
ASSERT_SUCCESS(aws_channel_slot_set_handler(testing->right_handler_slot, handler));
return AWS_OP_SUCCESS;
}
/** Return whether channel is completely shut down */
static inline bool testing_channel_is_shutdown_completed(const struct testing_channel *testing) {
return testing->channel_shutdown_completed;
}
/** Return channel's shutdown error_code */
static inline int testing_channel_get_shutdown_error_code(const struct testing_channel *testing) {
AWS_ASSERT(testing->channel_shutdown_completed);
return testing->channel_shutdown_error_code;
}
/**
* Set a callback which is invoked during the handler's shutdown,
* once in the read direction and again in the write direction.
* Use this to inject actions that might occur in the middle of channel shutdown.
*/
static inline void testing_channel_set_downstream_handler_shutdown_callback(
struct testing_channel *testing,
testing_channel_handler_on_shutdown_fn *on_shutdown,
void *user_data) {
AWS_ASSERT(testing->right_handler_impl);
testing->right_handler_impl->on_shutdown = on_shutdown;
testing->right_handler_impl->on_shutdown_user_data = user_data;
}
/* Pop first message from queue and compare its contents to expected data. */
static inline int testing_channel_check_written_message(
struct testing_channel *channel,
struct aws_byte_cursor expected) {
struct aws_linked_list *msgs = testing_channel_get_written_message_queue(channel);
ASSERT_TRUE(!aws_linked_list_empty(msgs));
struct aws_linked_list_node *node = aws_linked_list_pop_front(msgs);
struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
ASSERT_BIN_ARRAYS_EQUALS(expected.ptr, expected.len, msg->message_data.buffer, msg->message_data.len);
aws_mem_release(msg->allocator, msg);
return AWS_OP_SUCCESS;
}
/* Pop first message from queue and compare its contents to expected data. */
static inline int testing_channel_check_written_message_str(struct testing_channel *channel, const char *expected) {
return testing_channel_check_written_message(channel, aws_byte_cursor_from_c_str(expected));
}
/* copies all messages in a list into a buffer, cleans up messages*/
static inline int testing_channel_drain_messages(struct aws_linked_list *msgs, struct aws_byte_buf *buffer) {
while (!aws_linked_list_empty(msgs)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(msgs);
struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
struct aws_byte_cursor msg_cursor = aws_byte_cursor_from_buf(&msg->message_data);
aws_byte_buf_append_dynamic(buffer, &msg_cursor);
aws_mem_release(msg->allocator, msg);
}
return AWS_OP_SUCCESS;
}
/* Pop all messages from queue and compare their contents to expected data */
static inline int testing_channel_check_messages_ex(
struct aws_linked_list *msgs,
struct aws_allocator *allocator,
struct aws_byte_cursor expected) {
struct aws_byte_buf all_msgs;
ASSERT_SUCCESS(aws_byte_buf_init(&all_msgs, allocator, 1024));
ASSERT_SUCCESS(testing_channel_drain_messages(msgs, &all_msgs));
ASSERT_BIN_ARRAYS_EQUALS(expected.ptr, expected.len, all_msgs.buffer, all_msgs.len);
aws_byte_buf_clean_up(&all_msgs);
return AWS_OP_SUCCESS;
}
/* Check contents of all messages sent in the write direction. */
static inline int testing_channel_check_written_messages(
struct testing_channel *channel,
struct aws_allocator *allocator,
struct aws_byte_cursor expected) {
struct aws_linked_list *msgs = testing_channel_get_written_message_queue(channel);
return testing_channel_check_messages_ex(msgs, allocator, expected);
}
/* Check contents of all messages sent in the write direction. */
static inline int testing_channel_check_written_messages_str(
struct testing_channel *channel,
struct aws_allocator *allocator,
const char *expected) {
return testing_channel_check_written_messages(channel, allocator, aws_byte_cursor_from_c_str(expected));
}
/* Extract contents of all messages sent in the write direction. */
static inline int testing_channel_drain_written_messages(struct testing_channel *channel, struct aws_byte_buf *output) {
struct aws_linked_list *msgs = testing_channel_get_written_message_queue(channel);
ASSERT_SUCCESS(testing_channel_drain_messages(msgs, output));
return AWS_OP_SUCCESS;
}
/* Check contents of all read-messages sent in the read direction by a midchannel http-handler */
static inline int testing_channel_check_midchannel_read_messages(
struct testing_channel *channel,
struct aws_allocator *allocator,
struct aws_byte_cursor expected) {
struct aws_linked_list *msgs = testing_channel_get_read_message_queue(channel);
return testing_channel_check_messages_ex(msgs, allocator, expected);
}
/* Check contents of all read-messages sent in the read direction by a midchannel http-handler */
static inline int testing_channel_check_midchannel_read_messages_str(
struct testing_channel *channel,
struct aws_allocator *allocator,
const char *expected) {
return testing_channel_check_midchannel_read_messages(channel, allocator, aws_byte_cursor_from_c_str(expected));
}
/* For sending an aws_io_message into the channel, in the write or read direction */
static inline int testing_channel_send_data(
struct testing_channel *channel,
struct aws_byte_cursor data,
enum aws_channel_direction dir,
bool ignore_send_message_errors) {
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(channel->channel, AWS_IO_MESSAGE_APPLICATION_DATA, data.len);
ASSERT_NOT_NULL(msg);
ASSERT_TRUE(aws_byte_buf_write_from_whole_cursor(&msg->message_data, data));
int err;
if (dir == AWS_CHANNEL_DIR_READ) {
err = testing_channel_push_read_message(channel, msg);
} else {
err = testing_channel_push_write_message(channel, msg);
}
if (err) {
/* If an error happens, clean the message here. Else, the recipient of the message will take the ownership */
aws_mem_release(msg->allocator, msg);
}
if (!ignore_send_message_errors) {
ASSERT_SUCCESS(err);
}
return AWS_OP_SUCCESS;
}
/** Create an aws_io_message, containing the following data, and pushes it up the channel in the read direction */
static inline int testing_channel_push_read_data(struct testing_channel *channel, struct aws_byte_cursor data) {
return testing_channel_send_data(channel, data, AWS_CHANNEL_DIR_READ, false);
}
/** Create an aws_io_message, containing the following data, and pushes it up the channel in the read direction */
static inline int testing_channel_push_read_str(struct testing_channel *channel, const char *str) {
return testing_channel_send_data(channel, aws_byte_cursor_from_c_str(str), AWS_CHANNEL_DIR_READ, false);
}
/** Create an aws_io_message, containing the following data.
* Tries to push it up the channel in the read direction, but don't assert if the message can't be sent.
* Useful for testing data that arrives during handler shutdown */
static inline int testing_channel_push_read_str_ignore_errors(struct testing_channel *channel, const char *str) {
return testing_channel_send_data(channel, aws_byte_cursor_from_c_str(str), AWS_CHANNEL_DIR_READ, true);
}
/** Create an aws_io_message, containing the following data, and pushes it up the channel in the write direction */
static inline int testing_channel_push_write_data(struct testing_channel *channel, struct aws_byte_cursor data) {
return testing_channel_send_data(channel, data, AWS_CHANNEL_DIR_WRITE, false);
}
/** Create an aws_io_message, containing the following data, and pushes it up the channel in the write direction */
static inline int testing_channel_push_write_str(struct testing_channel *channel, const char *str) {
return testing_channel_send_data(channel, aws_byte_cursor_from_c_str(str), AWS_CHANNEL_DIR_WRITE, false);
}
#endif /* AWS_TESTING_IO_TESTING_CHANNEL_H */

View File

@@ -0,0 +1,227 @@
#ifndef AWS_TESTING_STREAM_TESTER_H
#define AWS_TESTING_STREAM_TESTER_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/io/stream.h>
#ifndef AWS_UNSTABLE_TESTING_API
# error This code is designed for use by AWS owned libraries for the AWS C99 SDK. \
You are welcome to use it, but we make no promises on the stability of this API. \
To enable use of this code, set the AWS_UNSTABLE_TESTING_API compiler flag.
#endif
/**
* Use aws_input_stream tester to test edge cases in systems that take input streams.
* You can make it behave in specific weird ways (e.g. fail on 3rd read).
*
* There are a few ways to set what gets streamed.
* - source_bytes: if set, stream these bytes.
* - source_stream: if set, wrap this stream (but insert weird behavior like failing on 3rd read).
* - autogen_length: autogen streaming content N bytes in length.
*/
enum aws_autogen_style {
AWS_AUTOGEN_LOREM_IPSUM,
AWS_AUTOGEN_ALPHABET,
AWS_AUTOGEN_NUMBERS,
};
struct aws_input_stream_tester_options {
/* bytes to be streamed.
* the stream copies these to its own internal buffer.
* or you can set the autogen_length */
struct aws_byte_cursor source_bytes;
/* wrap another stream */
struct aws_input_stream *source_stream;
/* if non-zero, autogen streaming content N bytes in length */
size_t autogen_length;
/* style of contents (if using autogen) */
enum aws_autogen_style autogen_style;
/* if non-zero, read at most N bytes per read() */
size_t max_bytes_per_read;
/* if non-zero, read 0 bytes the Nth time read() is called */
size_t read_zero_bytes_on_nth_read;
/* If false, EOF is reported by the read() which produces the last few bytes.
* If true, EOF isn't reported until there's one more read(), producing zero bytes.
* This emulates an underlying stream that reports EOF by reading 0 bytes */
bool eof_requires_extra_read;
/* if non-zero, fail the Nth time read() is called, raising `fail_with_error_code` */
size_t fail_on_nth_read;
/* error-code to raise if failing on purpose */
int fail_with_error_code;
};
struct aws_input_stream_tester {
struct aws_input_stream base;
struct aws_allocator *alloc;
struct aws_input_stream_tester_options options;
struct aws_byte_buf source_buf;
struct aws_input_stream *source_stream;
size_t read_count;
bool num_bytes_last_read; /* number of bytes read in the most recent successful read() */
uint64_t total_bytes_read;
};
static inline int s_input_stream_tester_seek(
struct aws_input_stream *stream,
int64_t offset,
enum aws_stream_seek_basis basis) {
struct aws_input_stream_tester *impl = (struct aws_input_stream_tester *)stream->impl;
return aws_input_stream_seek(impl->source_stream, offset, basis);
}
static inline int s_input_stream_tester_read(struct aws_input_stream *stream, struct aws_byte_buf *original_dest) {
struct aws_input_stream_tester *impl = (struct aws_input_stream_tester *)stream->impl;
impl->read_count++;
/* if we're configured to fail, then do it */
if (impl->read_count == impl->options.fail_on_nth_read) {
AWS_FATAL_ASSERT(impl->options.fail_with_error_code != 0);
return aws_raise_error(impl->options.fail_with_error_code);
}
/* cap how much is read, if that's how we're configured */
size_t bytes_to_read = original_dest->capacity - original_dest->len;
if (impl->options.max_bytes_per_read != 0) {
bytes_to_read = aws_min_size(bytes_to_read, impl->options.max_bytes_per_read);
}
if (impl->read_count == impl->options.read_zero_bytes_on_nth_read) {
bytes_to_read = 0;
}
/* pass artificially capped buffer to actual stream */
struct aws_byte_buf capped_buf =
aws_byte_buf_from_empty_array(original_dest->buffer + original_dest->len, bytes_to_read);
if (aws_input_stream_read(impl->source_stream, &capped_buf)) {
return AWS_OP_ERR;
}
size_t bytes_actually_read = capped_buf.len;
original_dest->len += bytes_actually_read;
impl->num_bytes_last_read = bytes_actually_read;
impl->total_bytes_read += bytes_actually_read;
return AWS_OP_SUCCESS;
}
static inline int s_input_stream_tester_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_input_stream_tester *impl = (struct aws_input_stream_tester *)stream->impl;
if (aws_input_stream_get_status(impl->source_stream, status)) {
return AWS_OP_ERR;
}
/* if we're emulating a stream that requires an additional 0 byte read to realize it's EOF */
if (impl->options.eof_requires_extra_read) {
if (impl->num_bytes_last_read > 0) {
status->is_end_of_stream = false;
}
}
return AWS_OP_SUCCESS;
}
static inline int s_input_stream_tester_get_length(struct aws_input_stream *stream, int64_t *out_length) {
struct aws_input_stream_tester *impl = (struct aws_input_stream_tester *)stream->impl;
return aws_input_stream_get_length(impl->source_stream, out_length);
}
static struct aws_input_stream_vtable s_input_stream_tester_vtable = {
.seek = s_input_stream_tester_seek,
.read = s_input_stream_tester_read,
.get_status = s_input_stream_tester_get_status,
.get_length = s_input_stream_tester_get_length,
};
/* init byte-buf and fill it autogenned content */
static inline void s_byte_buf_init_autogenned(
struct aws_byte_buf *buf,
struct aws_allocator *alloc,
size_t length,
enum aws_autogen_style style) {
aws_byte_buf_init(buf, alloc, length);
struct aws_byte_cursor pattern = {0};
switch (style) {
case AWS_AUTOGEN_LOREM_IPSUM:
pattern = aws_byte_cursor_from_c_str(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore "
"et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
"aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse "
"cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa "
"qui officia deserunt mollit anim id est laborum. ");
break;
case AWS_AUTOGEN_ALPHABET:
pattern = aws_byte_cursor_from_c_str("abcdefghijklmnopqrstuvwxyz");
break;
case AWS_AUTOGEN_NUMBERS:
pattern = aws_byte_cursor_from_c_str("1234567890");
break;
}
struct aws_byte_cursor pattern_cursor = {0};
while (buf->len < buf->capacity) {
if (pattern_cursor.len == 0) {
pattern_cursor = pattern;
}
aws_byte_buf_write_to_capacity(buf, &pattern_cursor);
}
}
static inline uint64_t aws_input_stream_tester_total_bytes_read(const struct aws_input_stream *stream) {
const struct aws_input_stream_tester *impl = (const struct aws_input_stream_tester *)stream->impl;
return impl->total_bytes_read;
}
static inline void s_input_stream_tester_destroy(void *user_data) {
struct aws_input_stream_tester *impl = (struct aws_input_stream_tester *)user_data;
aws_input_stream_release(impl->source_stream);
aws_byte_buf_clean_up(&impl->source_buf);
aws_mem_release(impl->alloc, impl);
}
static inline struct aws_input_stream *aws_input_stream_new_tester(
struct aws_allocator *alloc,
const struct aws_input_stream_tester_options *options) {
struct aws_input_stream_tester *impl =
(struct aws_input_stream_tester *)aws_mem_calloc(alloc, 1, sizeof(struct aws_input_stream_tester));
impl->base.impl = impl;
impl->base.vtable = &s_input_stream_tester_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, s_input_stream_tester_destroy);
impl->alloc = alloc;
impl->options = *options;
if (options->source_stream != NULL) {
AWS_FATAL_ASSERT((options->autogen_length == 0) && (options->source_bytes.len == 0));
impl->source_stream = aws_input_stream_acquire(options->source_stream);
} else {
if (options->autogen_length > 0) {
AWS_FATAL_ASSERT(options->source_bytes.len == 0);
s_byte_buf_init_autogenned(&impl->source_buf, alloc, options->autogen_length, options->autogen_style);
} else {
aws_byte_buf_init_copy_from_cursor(&impl->source_buf, alloc, options->source_bytes);
}
struct aws_byte_cursor source_buf_cursor = aws_byte_cursor_from_buf(&impl->source_buf);
impl->source_stream = aws_input_stream_new_from_cursor(alloc, &source_buf_cursor);
AWS_FATAL_ASSERT(impl->source_stream);
}
return &impl->base;
}
#endif /* AWS_TESTING_STREAM_TESTER_H */