From 6d8b050eda8d1e782f553880da66f4135f01a685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=82=E3=81=8F?= Date: Mon, 10 Jun 2024 18:53:08 +0100 Subject: [PATCH] [FL-3833] Furi: event loop (#3675) * Furi: epoll prototype * Gui: simplify view_dispatcher custom event processing * Furi: add missing critical sections to epoll * Furi: add epoll unit tests, fully implement level processing for in and out events * Furi: properly trigger epoll item event on adding mq, update tests. * Unit tests: cleanup defines * Furi: protect epoll from modification in callback * Furi: rename epoll into event_loop, cleanup api naming * Sync API Symbols * Furi: add event loop contract and link api, port mq to new api, cleanup code * Format Sources * Furi: cleanup mq and event loop code * Furi: remove unused staff from message queue * ApiSymbols: remove event loop from public APIs. * Fix furi unit tests --------- Co-authored-by: Georgii Surkov Co-authored-by: Georgii Surkov <37121527+gsurkov@users.noreply.github.com> --- .../unit_tests/tests/furi/furi_event_loop.c | 164 ++++++++ .../debug/unit_tests/tests/furi/furi_test.c | 8 +- .../debug/unit_tests/unit_test_api_table_i.h | 16 +- applications/services/gui/view_dispatcher.c | 133 ++++--- applications/services/gui/view_dispatcher.h | 18 +- applications/services/gui/view_dispatcher_i.h | 28 +- furi/core/base.h | 1 + furi/core/event_loop.c | 352 ++++++++++++++++++ furi/core/event_loop.h | 134 +++++++ furi/core/event_loop_i.h | 33 ++ furi/core/message_queue.c | 53 ++- furi/core/message_queue_i.h | 12 + furi/core/stream_buffer.c | 4 +- furi/core/stream_buffer.h | 4 +- furi/furi.h | 1 + targets/f18/api_symbols.csv | 8 + targets/f7/api_symbols.csv | 8 + targets/f7/inc/FreeRTOSConfig.h | 8 +- 18 files changed, 906 insertions(+), 79 deletions(-) create mode 100644 applications/debug/unit_tests/tests/furi/furi_event_loop.c create mode 100644 furi/core/event_loop.c create mode 100644 furi/core/event_loop.h create mode 100644 furi/core/event_loop_i.h create mode 100644 furi/core/message_queue_i.h diff --git a/applications/debug/unit_tests/tests/furi/furi_event_loop.c b/applications/debug/unit_tests/tests/furi/furi_event_loop.c new file mode 100644 index 0000000000..06afc4fd9b --- /dev/null +++ b/applications/debug/unit_tests/tests/furi/furi_event_loop.c @@ -0,0 +1,164 @@ +#include "../test.h" +#include +#include + +#define TAG "TestFuriEventLoop" + +#define EVENT_LOOP_EVENT_COUNT (256u) + +typedef struct { + FuriMessageQueue* mq; + + FuriEventLoop* producer_event_loop; + uint32_t producer_counter; + + FuriEventLoop* consumer_event_loop; + uint32_t consumer_counter; +} TestFuriData; + +bool test_furi_event_loop_producer_mq_callback(FuriMessageQueue* queue, void* context) { + furi_check(context); + + TestFuriData* data = context; + furi_check(data->mq == queue, "Invalid queue"); + + FURI_LOG_I( + TAG, "producer_mq_callback: %lu %lu", data->producer_counter, data->consumer_counter); + + // Remove and add should not cause crash + // if(data->producer_counter == EVENT_LOOP_EVENT_COUNT/2) { + // furi_event_loop_message_queue_remove(data->producer_event_loop, data->mq); + // furi_event_loop_message_queue_add( + // data->producer_event_loop, + // data->mq, + // FuriEventLoopEventOut, + // test_furi_event_loop_producer_mq_callback, + // data); + // } + + if(data->producer_counter == EVENT_LOOP_EVENT_COUNT) { + furi_event_loop_stop(data->producer_event_loop); + return false; + } + + data->producer_counter++; + furi_check( + furi_message_queue_put(data->mq, &data->producer_counter, 0) == FuriStatusOk, + "furi_message_queue_put failed"); + furi_delay_us(furi_hal_random_get() % 1000); + + return true; +} + +int32_t test_furi_event_loop_producer(void* p) { + furi_check(p); + + FURI_LOG_I(TAG, "producer start"); + + TestFuriData* data = p; + + data->producer_event_loop = furi_event_loop_alloc(); + furi_event_loop_message_queue_subscribe( + data->producer_event_loop, + data->mq, + FuriEventLoopEventOut, + test_furi_event_loop_producer_mq_callback, + data); + + furi_event_loop_run(data->producer_event_loop); + + furi_event_loop_message_queue_unsubscribe(data->producer_event_loop, data->mq); + furi_event_loop_free(data->producer_event_loop); + + FURI_LOG_I(TAG, "producer end"); + + return 0; +} + +bool test_furi_event_loop_consumer_mq_callback(FuriMessageQueue* queue, void* context) { + furi_check(context); + + TestFuriData* data = context; + furi_check(data->mq == queue); + + furi_delay_us(furi_hal_random_get() % 1000); + furi_check(furi_message_queue_get(data->mq, &data->consumer_counter, 0) == FuriStatusOk); + + FURI_LOG_I( + TAG, "consumer_mq_callback: %lu %lu", data->producer_counter, data->consumer_counter); + + // Remove and add should not cause crash + // if(data->producer_counter == EVENT_LOOP_EVENT_COUNT/2) { + // furi_event_loop_message_queue_remove(data->consumer_event_loop, data->mq); + // furi_event_loop_message_queue_add( + // data->consumer_event_loop, + // data->mq, + // FuriEventLoopEventIn, + // test_furi_event_loop_producer_mq_callback, + // data); + // } + + if(data->consumer_counter == EVENT_LOOP_EVENT_COUNT) { + furi_event_loop_stop(data->consumer_event_loop); + return false; + } + + return true; +} + +int32_t test_furi_event_loop_consumer(void* p) { + furi_check(p); + + FURI_LOG_I(TAG, "consumer start"); + + TestFuriData* data = p; + + data->consumer_event_loop = furi_event_loop_alloc(); + furi_event_loop_message_queue_subscribe( + data->consumer_event_loop, + data->mq, + FuriEventLoopEventIn, + test_furi_event_loop_consumer_mq_callback, + data); + + furi_event_loop_run(data->consumer_event_loop); + + furi_event_loop_message_queue_unsubscribe(data->consumer_event_loop, data->mq); + furi_event_loop_free(data->consumer_event_loop); + + FURI_LOG_I(TAG, "consumer end"); + + return 0; +} + +void test_furi_event_loop(void) { + TestFuriData data = {}; + + data.mq = furi_message_queue_alloc(16, sizeof(uint32_t)); + + FuriThread* producer_thread = furi_thread_alloc(); + furi_thread_set_name(producer_thread, "producer_thread"); + furi_thread_set_stack_size(producer_thread, 1 * 1024); + furi_thread_set_callback(producer_thread, test_furi_event_loop_producer); + furi_thread_set_context(producer_thread, &data); + furi_thread_start(producer_thread); + + FuriThread* consumer_thread = furi_thread_alloc(); + furi_thread_set_name(consumer_thread, "consumer_thread"); + furi_thread_set_stack_size(consumer_thread, 1 * 1024); + furi_thread_set_callback(consumer_thread, test_furi_event_loop_consumer); + furi_thread_set_context(consumer_thread, &data); + furi_thread_start(consumer_thread); + + // Wait for thread to complete their tasks + furi_thread_join(producer_thread); + furi_thread_join(consumer_thread); + + // The test itself + mu_assert_int_eq(data.producer_counter, data.consumer_counter); + + // Release memory + furi_thread_free(consumer_thread); + furi_thread_free(producer_thread); + furi_message_queue_free(data.mq); +} diff --git a/applications/debug/unit_tests/tests/furi/furi_test.c b/applications/debug/unit_tests/tests/furi/furi_test.c index f08e4aa6b2..be579d2b8c 100644 --- a/applications/debug/unit_tests/tests/furi/furi_test.c +++ b/applications/debug/unit_tests/tests/furi/furi_test.c @@ -6,8 +6,8 @@ void test_furi_create_open(void); void test_furi_concurrent_access(void); void test_furi_pubsub(void); - void test_furi_memmgr(void); +void test_furi_event_loop(void); static int foo = 0; @@ -38,15 +38,19 @@ MU_TEST(mu_test_furi_memmgr) { test_furi_memmgr(); } +MU_TEST(mu_test_furi_event_loop) { + test_furi_event_loop(); +} + MU_TEST_SUITE(test_suite) { MU_SUITE_CONFIGURE(&test_setup, &test_teardown); - MU_RUN_TEST(test_check); // v2 tests MU_RUN_TEST(mu_test_furi_create_open); MU_RUN_TEST(mu_test_furi_pubsub); MU_RUN_TEST(mu_test_furi_memmgr); + MU_RUN_TEST(mu_test_furi_event_loop); } int run_minunit_test_furi(void) { diff --git a/applications/debug/unit_tests/unit_test_api_table_i.h b/applications/debug/unit_tests/unit_test_api_table_i.h index e6409f3ac5..fc659aea90 100644 --- a/applications/debug/unit_tests/unit_test_api_table_i.h +++ b/applications/debug/unit_tests/unit_test_api_table_i.h @@ -6,11 +6,12 @@ #include #include +#include static constexpr auto unit_tests_api_table = sort(create_array_t( API_METHOD(resource_manifest_reader_alloc, ResourceManifestReader*, (Storage*)), API_METHOD(resource_manifest_reader_free, void, (ResourceManifestReader*)), - API_METHOD(resource_manifest_reader_open, bool, (ResourceManifestReader*, const char* filename)), + API_METHOD(resource_manifest_reader_open, bool, (ResourceManifestReader*, const char*)), API_METHOD(resource_manifest_reader_next, ResourceManifestEntry*, (ResourceManifestReader*)), API_METHOD(resource_manifest_reader_previous, ResourceManifestEntry*, (ResourceManifestReader*)), API_METHOD(slix_process_iso15693_3_error, SlixError, (Iso15693_3Error)), @@ -26,4 +27,17 @@ static constexpr auto unit_tests_api_table = sort(create_array_t( xQueueGenericSend, BaseType_t, (QueueHandle_t, const void* const, TickType_t, const BaseType_t)), + API_METHOD(furi_event_loop_alloc, FuriEventLoop*, (void)), + API_METHOD(furi_event_loop_free, void, (FuriEventLoop*)), + API_METHOD( + furi_event_loop_message_queue_subscribe, + void, + (FuriEventLoop*, + FuriMessageQueue*, + FuriEventLoopEvent, + FuriEventLoopMessageQueueCallback, + void*)), + API_METHOD(furi_event_loop_message_queue_unsubscribe, void, (FuriEventLoop*, FuriMessageQueue*)), + API_METHOD(furi_event_loop_run, void, (FuriEventLoop*)), + API_METHOD(furi_event_loop_stop, void, (FuriEventLoop*)), API_VARIABLE(PB_Main_msg, PB_Main_msg_t))); diff --git a/applications/services/gui/view_dispatcher.c b/applications/services/gui/view_dispatcher.c index d4c2f61e79..0d0437736e 100644 --- a/applications/services/gui/view_dispatcher.c +++ b/applications/services/gui/view_dispatcher.c @@ -29,8 +29,18 @@ void view_dispatcher_free(ViewDispatcher* view_dispatcher) { // Free ViewPort view_port_free(view_dispatcher->view_port); // Free internal queue - if(view_dispatcher->queue) { - furi_message_queue_free(view_dispatcher->queue); + if(view_dispatcher->input_queue) { + furi_event_loop_message_queue_unsubscribe( + view_dispatcher->event_loop, view_dispatcher->input_queue); + furi_message_queue_free(view_dispatcher->input_queue); + } + if(view_dispatcher->event_queue) { + furi_event_loop_message_queue_unsubscribe( + view_dispatcher->event_loop, view_dispatcher->event_queue); + furi_message_queue_free(view_dispatcher->event_queue); + } + if(view_dispatcher->event_loop) { + furi_event_loop_free(view_dispatcher->event_loop); } // Free dispatcher free(view_dispatcher); @@ -38,8 +48,25 @@ void view_dispatcher_free(ViewDispatcher* view_dispatcher) { void view_dispatcher_enable_queue(ViewDispatcher* view_dispatcher) { furi_check(view_dispatcher); - furi_check(view_dispatcher->queue == NULL); - view_dispatcher->queue = furi_message_queue_alloc(16, sizeof(ViewDispatcherMessage)); + furi_check(view_dispatcher->event_loop == NULL); + + view_dispatcher->event_loop = furi_event_loop_alloc(); + + view_dispatcher->input_queue = furi_message_queue_alloc(8, sizeof(InputEvent)); + furi_event_loop_message_queue_subscribe( + view_dispatcher->event_loop, + view_dispatcher->input_queue, + FuriEventLoopEventIn, + view_dispatcher_run_input_callback, + view_dispatcher); + + view_dispatcher->event_queue = furi_message_queue_alloc(8, sizeof(uint32_t)); + furi_event_loop_message_queue_subscribe( + view_dispatcher->event_loop, + view_dispatcher->event_queue, + FuriEventLoopEventIn, + view_dispatcher_run_event_callback, + view_dispatcher); } void view_dispatcher_set_event_callback_context(ViewDispatcher* view_dispatcher, void* context) { @@ -70,48 +97,45 @@ void view_dispatcher_set_tick_event_callback( view_dispatcher->tick_period = tick_period; } +FuriEventLoop* view_dispatcher_get_event_loop(ViewDispatcher* view_dispatcher) { + furi_check(view_dispatcher); + furi_check(view_dispatcher->event_loop); + + return view_dispatcher->event_loop; +} + void view_dispatcher_run(ViewDispatcher* view_dispatcher) { furi_check(view_dispatcher); - furi_check(view_dispatcher->queue); + furi_check(view_dispatcher->event_loop); uint32_t tick_period = view_dispatcher->tick_period == 0 ? FuriWaitForever : view_dispatcher->tick_period; - ViewDispatcherMessage message; - while(1) { - if(furi_message_queue_get(view_dispatcher->queue, &message, tick_period) != FuriStatusOk) { - view_dispatcher_handle_tick_event(view_dispatcher); - continue; - } - if(message.type == ViewDispatcherMessageTypeStop) { - break; - } else if(message.type == ViewDispatcherMessageTypeInput) { - view_dispatcher_handle_input(view_dispatcher, &message.input); - } else if(message.type == ViewDispatcherMessageTypeCustomEvent) { - view_dispatcher_handle_custom_event(view_dispatcher, message.custom_event); - } - } + + furi_event_loop_tick_set( + view_dispatcher->event_loop, + tick_period, + view_dispatcher_handle_tick_event, + view_dispatcher); + + furi_event_loop_run(view_dispatcher->event_loop); // Wait till all input events delivered + InputEvent input; while(view_dispatcher->ongoing_input) { - furi_message_queue_get(view_dispatcher->queue, &message, FuriWaitForever); - if(message.type == ViewDispatcherMessageTypeInput) { - uint8_t key_bit = (1 << message.input.key); - if(message.input.type == InputTypePress) { - view_dispatcher->ongoing_input |= key_bit; - } else if(message.input.type == InputTypeRelease) { - view_dispatcher->ongoing_input &= ~key_bit; - } + furi_message_queue_get(view_dispatcher->input_queue, &input, FuriWaitForever); + uint8_t key_bit = (1 << input.key); + if(input.type == InputTypePress) { + view_dispatcher->ongoing_input |= key_bit; + } else if(input.type == InputTypeRelease) { + view_dispatcher->ongoing_input &= ~key_bit; } } } void view_dispatcher_stop(ViewDispatcher* view_dispatcher) { furi_check(view_dispatcher); - furi_check(view_dispatcher->queue); - ViewDispatcherMessage message; - message.type = ViewDispatcherMessageTypeStop; - furi_check( - furi_message_queue_put(view_dispatcher->queue, &message, FuriWaitForever) == FuriStatusOk); + furi_check(view_dispatcher->event_loop); + furi_event_loop_stop(view_dispatcher->event_loop); } void view_dispatcher_add_view(ViewDispatcher* view_dispatcher, uint32_t view_id, View* view) { @@ -218,12 +242,9 @@ void view_dispatcher_draw_callback(Canvas* canvas, void* context) { void view_dispatcher_input_callback(InputEvent* event, void* context) { ViewDispatcher* view_dispatcher = context; - if(view_dispatcher->queue) { - ViewDispatcherMessage message; - message.type = ViewDispatcherMessageTypeInput; - message.input = *event; + if(view_dispatcher->input_queue) { furi_check( - furi_message_queue_put(view_dispatcher->queue, &message, FuriWaitForever) == + furi_message_queue_put(view_dispatcher->input_queue, event, FuriWaitForever) == FuriStatusOk); } else { view_dispatcher_handle_input(view_dispatcher, event); @@ -287,7 +308,8 @@ void view_dispatcher_handle_input(ViewDispatcher* view_dispatcher, InputEvent* e } } -void view_dispatcher_handle_tick_event(ViewDispatcher* view_dispatcher) { +void view_dispatcher_handle_tick_event(void* context) { + ViewDispatcher* view_dispatcher = context; if(view_dispatcher->tick_event_callback) { view_dispatcher->tick_event_callback(view_dispatcher->event_context); } @@ -306,14 +328,11 @@ void view_dispatcher_handle_custom_event(ViewDispatcher* view_dispatcher, uint32 void view_dispatcher_send_custom_event(ViewDispatcher* view_dispatcher, uint32_t event) { furi_check(view_dispatcher); - furi_check(view_dispatcher->queue); - - ViewDispatcherMessage message; - message.type = ViewDispatcherMessageTypeCustomEvent; - message.custom_event = event; + furi_check(view_dispatcher->event_loop); furi_check( - furi_message_queue_put(view_dispatcher->queue, &message, FuriWaitForever) == FuriStatusOk); + furi_message_queue_put(view_dispatcher->event_queue, &event, FuriWaitForever) == + FuriStatusOk); } static const ViewPortOrientation view_dispatcher_view_port_orientation_table[] = { @@ -345,7 +364,7 @@ void view_dispatcher_set_current_view(ViewDispatcher* view_dispatcher, View* vie view_port_update(view_dispatcher->view_port); } else { view_port_enabled_set(view_dispatcher->view_port, false); - if(view_dispatcher->queue) { + if(view_dispatcher->event_loop) { view_dispatcher_stop(view_dispatcher); } } @@ -361,3 +380,27 @@ void view_dispatcher_update(View* view, void* context) { view_port_update(view_dispatcher->view_port); } } + +bool view_dispatcher_run_event_callback(FuriMessageQueue* queue, void* context) { + furi_assert(context); + ViewDispatcher* instance = context; + furi_assert(instance->event_queue == queue); + + uint32_t event; + furi_check(furi_message_queue_get(instance->event_queue, &event, 0) == FuriStatusOk); + view_dispatcher_handle_custom_event(instance, event); + + return true; +} + +bool view_dispatcher_run_input_callback(FuriMessageQueue* queue, void* context) { + furi_assert(context); + ViewDispatcher* instance = context; + furi_assert(instance->input_queue == queue); + + InputEvent input; + furi_check(furi_message_queue_get(instance->input_queue, &input, 0) == FuriStatusOk); + view_dispatcher_handle_input(instance, &input); + + return true; +} diff --git a/applications/services/gui/view_dispatcher.h b/applications/services/gui/view_dispatcher.h index f8567ea1ac..7627e5a0b0 100644 --- a/applications/services/gui/view_dispatcher.h +++ b/applications/services/gui/view_dispatcher.h @@ -47,8 +47,8 @@ void view_dispatcher_free(ViewDispatcher* view_dispatcher); /** Enable queue support * - * If queue enabled all input and custom events will be dispatched throw - * internal queue + * Allocates event_loop, input and event message queues. Must be used with + * `view_dispatcher_run` * * @param view_dispatcher ViewDispatcher instance */ @@ -101,6 +101,20 @@ void view_dispatcher_set_tick_event_callback( */ void view_dispatcher_set_event_callback_context(ViewDispatcher* view_dispatcher, void* context); +/** Get event_loop instance + * + * event_loop instance is allocated on `view_dispatcher_enable_queue` and used + * in view_dispatcher_run. + * + * You can add your objects into event_loop instance, but don't run the loop on + * your side it will cause issues with input processing on dispatcher stop. + * + * @param view_dispatcher ViewDispatcher instance + * + * @return The event_loop instance. + */ +FuriEventLoop* view_dispatcher_get_event_loop(ViewDispatcher* view_dispatcher); + /** Run ViewDispatcher * * Use only after queue enabled diff --git a/applications/services/gui/view_dispatcher_i.h b/applications/services/gui/view_dispatcher_i.h index f30a84e6bd..fcf426c317 100644 --- a/applications/services/gui/view_dispatcher_i.h +++ b/applications/services/gui/view_dispatcher_i.h @@ -5,7 +5,6 @@ #pragma once -#include #include #include "view_dispatcher.h" @@ -15,7 +14,10 @@ DICT_DEF2(ViewDict, uint32_t, M_DEFAULT_OPLIST, View*, M_PTR_OPLIST) struct ViewDispatcher { - FuriMessageQueue* queue; + FuriEventLoop* event_loop; + FuriMessageQueue* input_queue; + FuriMessageQueue* event_queue; + Gui* gui; ViewPort* view_port; ViewDict_t views; @@ -32,20 +34,6 @@ struct ViewDispatcher { void* event_context; }; -typedef enum { - ViewDispatcherMessageTypeInput, - ViewDispatcherMessageTypeCustomEvent, - ViewDispatcherMessageTypeStop, -} ViewDispatcherMessageType; - -typedef struct { - ViewDispatcherMessageType type; - union { - InputEvent input; - uint32_t custom_event; - }; -} ViewDispatcherMessage; - /** ViewPort Draw Callback */ void view_dispatcher_draw_callback(Canvas* canvas, void* context); @@ -56,7 +44,7 @@ void view_dispatcher_input_callback(InputEvent* event, void* context); void view_dispatcher_handle_input(ViewDispatcher* view_dispatcher, InputEvent* event); /** Tick handler */ -void view_dispatcher_handle_tick_event(ViewDispatcher* view_dispatcher); +void view_dispatcher_handle_tick_event(void* context); /** Custom event handler */ void view_dispatcher_handle_custom_event(ViewDispatcher* view_dispatcher, uint32_t event); @@ -66,3 +54,9 @@ void view_dispatcher_set_current_view(ViewDispatcher* view_dispatcher, View* vie /** ViewDispatcher update event */ void view_dispatcher_update(View* view, void* context); + +/** ViewDispatcher run event loop event callback */ +bool view_dispatcher_run_event_callback(FuriMessageQueue* queue, void* context); + +/** ViewDispatcher run event loop input callback */ +bool view_dispatcher_run_input_callback(FuriMessageQueue* queue, void* context); diff --git a/furi/core/base.h b/furi/core/base.h index 642ff2b6cd..92a52a7978 100644 --- a/furi/core/base.h +++ b/furi/core/base.h @@ -2,6 +2,7 @@ #include #include +#include #include #ifdef __cplusplus diff --git a/furi/core/event_loop.c b/furi/core/event_loop.c new file mode 100644 index 0000000000..f228346928 --- /dev/null +++ b/furi/core/event_loop.c @@ -0,0 +1,352 @@ +#include "event_loop_i.h" +#include "message_queue_i.h" + +#include "check.h" +#include "thread.h" + +#include +#include + +#include +#include + +struct FuriEventLoopItem { + // Source + FuriEventLoop* owner; + + // Tracking item + const FuriEventLoopContract* contract; + void* object; + FuriEventLoopEvent event; + + // Callback and context + FuriEventLoopMessageQueueCallback callback; + void* callback_context; + + // Waiting list + ILIST_INTERFACE(WaitingList, struct FuriEventLoopItem); +}; + +ILIST_DEF(WaitingList, FuriEventLoopItem, M_POD_OPLIST) + +static FuriEventLoopItem* furi_event_loop_item_alloc( + FuriEventLoop* owner, + const FuriEventLoopContract* contract, + void* object, + FuriEventLoopEvent event); + +static void furi_event_loop_item_free(FuriEventLoopItem* instance); + +static void furi_event_loop_item_set_callback( + FuriEventLoopItem* instance, + FuriEventLoopMessageQueueCallback callback, + void* callback_context); + +static void furi_event_loop_item_notify(FuriEventLoopItem* instance); + +/* Event Loop RB tree */ +#define FURI_EVENT_LOOP_TREE_RANK (4) + +BPTREE_DEF2( // NOLINT + FuriEventLoopTree, + FURI_EVENT_LOOP_TREE_RANK, + void*, /* pointer to object we track */ + M_PTR_OPLIST, + FuriEventLoopItem*, /* pointer to the FuriEventLoopItem */ + M_PTR_OPLIST) + +#define M_OPL_FuriEventLoopTree_t() BPTREE_OPLIST(FuriEventLoopTree, M_POD_OPLIST) + +#define FURI_EVENT_LOOP_FLAG_NOTIFY_INDEX (2) + +typedef enum { + FuriEventLoopFlagEvent = (1 << 0), + FuriEventLoopFlagStop = (1 << 1), +} FuriEventLoopFlag; + +#define FuriEventLoopFlagAll (FuriEventLoopFlagEvent | FuriEventLoopFlagStop) + +typedef enum { + FuriEventLoopProcessStatusComplete, + FuriEventLoopProcessStatusIncomplete, + FuriEventLoopProcessStatusAgain, +} FuriEventLoopProcessStatus; + +typedef enum { + FuriEventLoopStateIdle, + FuriEventLoopStateProcessing, +} FuriEventLoopState; + +struct FuriEventLoop { + // Only works if all operations are done from the same thread + FuriThreadId thread_id; + + // Poller state + volatile FuriEventLoopState state; + + // Tree + FuriEventLoopTree_t tree; + // Tree waiting list + WaitingList_t waiting_list; + + // Tick event + uint32_t tick_interval; + FuriEventLoopTickCallback tick_callback; + void* tick_callback_context; +}; + +FuriEventLoop* furi_event_loop_alloc(void) { + FuriEventLoop* instance = malloc(sizeof(FuriEventLoop)); + + instance->thread_id = furi_thread_get_current_id(); + FuriEventLoopTree_init(instance->tree); + WaitingList_init(instance->waiting_list); + + return instance; +} + +void furi_event_loop_free(FuriEventLoop* instance) { + furi_check(instance); + furi_check(instance->thread_id == furi_thread_get_current_id()); + + FuriEventLoopTree_clear(instance->tree); + free(instance); +} + +static FuriEventLoopProcessStatus + furi_event_loop_poll_process_event(FuriEventLoop* instance, FuriEventLoopItem* item) { + UNUSED(instance); + + if(!item->contract->get_level(item->object, item->event)) { + return FuriEventLoopProcessStatusComplete; + } + + if(item->callback(item->object, item->callback_context)) { + return FuriEventLoopProcessStatusIncomplete; + } else { + return FuriEventLoopProcessStatusAgain; + } +} + +void furi_event_loop_run(FuriEventLoop* instance) { + furi_check(instance); + furi_check(instance->thread_id == furi_thread_get_current_id()); + + uint32_t timeout = instance->tick_callback ? instance->tick_interval : FuriWaitForever; + + while(true) { + uint32_t flags = 0; + BaseType_t ret = xTaskNotifyWaitIndexed( + FURI_EVENT_LOOP_FLAG_NOTIFY_INDEX, 0, FuriEventLoopFlagAll, &flags, timeout); + + instance->state = FuriEventLoopStateProcessing; + if(ret == pdTRUE) { + if(flags & FuriEventLoopFlagStop) { + instance->state = FuriEventLoopStateIdle; + break; + } else if(flags & FuriEventLoopFlagEvent) { + FuriEventLoopItem* item = NULL; + FURI_CRITICAL_ENTER(); + if(!WaitingList_empty_p(instance->waiting_list)) { + item = WaitingList_pop_front(instance->waiting_list); + WaitingList_init_field(item); + } + FURI_CRITICAL_EXIT(); + if(item) { + while(true) { + FuriEventLoopProcessStatus ret = + furi_event_loop_poll_process_event(instance, item); + if(ret == FuriEventLoopProcessStatusComplete) { + // Event processing complete, break from loop + break; + } else if(ret == FuriEventLoopProcessStatusIncomplete) { + // Event processing incomplete more processing needed + } else if(ret == FuriEventLoopProcessStatusAgain) { //-V547 + furi_event_loop_item_notify(item); + break; + } else { + furi_crash(); + } + } + } + } + } else { + if(instance->tick_callback) { + instance->tick_callback(instance->tick_callback_context); + } + } + instance->state = FuriEventLoopStateIdle; + } +} + +void furi_event_loop_stop(FuriEventLoop* instance) { + furi_check(instance); + furi_check(instance->thread_id == furi_thread_get_current_id()); + + xTaskNotifyIndexed( + instance->thread_id, FURI_EVENT_LOOP_FLAG_NOTIFY_INDEX, FuriEventLoopFlagStop, eSetBits); +} + +void furi_event_loop_tick_set( + FuriEventLoop* instance, + uint32_t interval, + FuriEventLoopTickCallback callback, + void* context) { + furi_check(instance); + furi_check(instance->thread_id == furi_thread_get_current_id()); + furi_check(callback ? interval > 0 : true); + + instance->tick_interval = interval; + instance->tick_callback = callback; + instance->tick_callback_context = context; +} + +void furi_event_loop_message_queue_subscribe( + FuriEventLoop* instance, + FuriMessageQueue* message_queue, + FuriEventLoopEvent event, + FuriEventLoopMessageQueueCallback callback, + void* context) { + furi_check(instance); + furi_check(instance->thread_id == furi_thread_get_current_id()); + furi_check(instance->state == FuriEventLoopStateIdle); + furi_check(message_queue); + + FURI_CRITICAL_ENTER(); + + furi_check(FuriEventLoopTree_get(instance->tree, message_queue) == NULL); + + // Allocate and setup item + FuriEventLoopItem* item = furi_event_loop_item_alloc( + instance, &furi_message_queue_event_loop_contract, message_queue, event); + furi_event_loop_item_set_callback(item, callback, context); + + FuriEventLoopTree_set_at(instance->tree, message_queue, item); + + FuriEventLoopLink* link = item->contract->get_link(message_queue); + + if(item->event == FuriEventLoopEventIn) { + furi_check(link->item_in == NULL); + link->item_in = item; + } else if(item->event == FuriEventLoopEventOut) { + furi_check(link->item_out == NULL); + link->item_out = item; + } else { + furi_crash(); + } + + if(item->contract->get_level(item->object, item->event)) { + furi_event_loop_item_notify(item); + } + + FURI_CRITICAL_EXIT(); +} + +void furi_event_loop_message_queue_unsubscribe( + FuriEventLoop* instance, + FuriMessageQueue* message_queue) { + furi_check(instance); + furi_check(instance->state == FuriEventLoopStateIdle); + furi_check(instance->thread_id == furi_thread_get_current_id()); + + FURI_CRITICAL_ENTER(); + + FuriEventLoopItem** item_ptr = FuriEventLoopTree_get(instance->tree, message_queue); + furi_check(item_ptr); + + FuriEventLoopItem* item = *item_ptr; + furi_check(item); + furi_check(item->owner == instance); + + FuriEventLoopLink* link = item->contract->get_link(message_queue); + + if(item->event == FuriEventLoopEventIn) { + furi_check(link->item_in == item); + link->item_in = NULL; + } else if(item->event == FuriEventLoopEventOut) { + furi_check(link->item_out == item); + link->item_out = NULL; + } else { + furi_crash(); + } + + furi_event_loop_item_free(item); + + FuriEventLoopTree_erase(instance->tree, message_queue); + + FURI_CRITICAL_EXIT(); +} + +/* + * Event Loop Item API, used internally + */ + +static FuriEventLoopItem* furi_event_loop_item_alloc( + FuriEventLoop* owner, + const FuriEventLoopContract* contract, + void* object, + FuriEventLoopEvent event) { + furi_assert(owner); + furi_assert(object); + + FuriEventLoopItem* instance = malloc(sizeof(FuriEventLoopItem)); + + instance->owner = owner; + instance->contract = contract; + instance->object = object; + instance->event = event; + + WaitingList_init_field(instance); + + return instance; +} + +static void furi_event_loop_item_free(FuriEventLoopItem* instance) { + furi_assert(instance); + free(instance); +} + +static void furi_event_loop_item_set_callback( + FuriEventLoopItem* instance, + FuriEventLoopMessageQueueCallback callback, + void* callback_context) { + furi_assert(instance); + furi_assert(!instance->callback); + + instance->callback = callback; + instance->callback_context = callback_context; +} + +static void furi_event_loop_item_notify(FuriEventLoopItem* instance) { + furi_assert(instance); + + FURI_CRITICAL_ENTER(); + + if(!instance->WaitingList.prev && !instance->WaitingList.next) { + WaitingList_push_back(instance->owner->waiting_list, instance); + } + + FURI_CRITICAL_EXIT(); + + xTaskNotifyIndexed( + instance->owner->thread_id, + FURI_EVENT_LOOP_FLAG_NOTIFY_INDEX, + FuriEventLoopFlagEvent, + eSetBits); +} + +void furi_event_loop_link_notify(FuriEventLoopLink* instance, FuriEventLoopEvent event) { + furi_assert(instance); + + FURI_CRITICAL_ENTER(); + + if(event == FuriEventLoopEventIn) { + if(instance->item_in) furi_event_loop_item_notify(instance->item_in); + } else if(event == FuriEventLoopEventOut) { + if(instance->item_out) furi_event_loop_item_notify(instance->item_out); + } else { + furi_crash(); + } + + FURI_CRITICAL_EXIT(); +} \ No newline at end of file diff --git a/furi/core/event_loop.h b/furi/core/event_loop.h new file mode 100644 index 0000000000..7221a90bca --- /dev/null +++ b/furi/core/event_loop.h @@ -0,0 +1,134 @@ +/** + * @file event_loop.h + * @brief Furi Event Loop + * + * This module is designed to handle application event loop in fully + * asynchronous, reactive nature. On the low level this modules is + * inspired by epoll/kqueue concept, on the high level by asyncio + * event loop. + * + * This module is trying to best fit into Furi OS, so we don't + * provide any compatibility with other event driven APIs. But + * programming concepts are the same, except some runtime + * limitations from our side. + */ +#pragma once + +#include "base.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** Event Loop events */ +typedef enum { + FuriEventLoopEventOut, /**< On departure: item was retrieved from container, flag reset, etc... */ + FuriEventLoopEventIn, /**< On arrival: item was inserted into container, flag set, etc... */ +} FuriEventLoopEvent; + +/** Anonymous message queue type */ +typedef struct FuriEventLoop FuriEventLoop; + +/** Allocate Event Loop instance + * + * Couple things to keep in mind: + * - You can have 1 event_loop per 1 thread + * - You can not use event_loop instance in the other thread + * - Do not use blocking api to query object delegated to Event Loop + * + * @return The Event Loop instance + */ +FuriEventLoop* furi_event_loop_alloc(void); + +/** Free Event Loop instance + * + * @param instance The Event Loop instance + */ +void furi_event_loop_free(FuriEventLoop* instance); + +/** Continuously poll for events + * + * Can be stopped with `furi_event_loop_stop` + * + * @param instance The Event Loop instance + */ +void furi_event_loop_run(FuriEventLoop* instance); + +/** Stop Event Loop instance + * + * @param instance The Event Loop instance + */ +void furi_event_loop_stop(FuriEventLoop* instance); + +/* + * Tick related API + */ + +/** Tick callback type + * + * @param context The context for callback + */ +typedef void (*FuriEventLoopTickCallback)(void* context); + +/** Set Event Loop tick callback + * + * Tick callback called after specified inactivity time. It's not periodic. If + * Event Loop is busy then ticks will be skipped. + * + * @param instance The Event Loop instance + * @param[in] interval The tick interval + * @param[in] callback The callback to call + * @param context The context for callback + */ +void furi_event_loop_tick_set( + FuriEventLoop* instance, + uint32_t interval, + FuriEventLoopTickCallback callback, + void* context); + +/* + * Message queue related APIs + */ + +/** Anonymous message queue type */ +typedef struct FuriMessageQueue FuriMessageQueue; + +/** Callback type for message queue + * + * @param queue The queue that triggered event + * @param context The context that was provided on + * furi_event_loop_message_queue_subscribe call + * + * @return true if event was processed, false if we need to delay processing + */ +typedef bool (*FuriEventLoopMessageQueueCallback)(FuriMessageQueue* queue, void* context); + +/** Subscribe to message queue events + * + * @warning you can only have one subscription for one event type. + * + * @param instance The Event Loop instance + * @param message_queue The message queue to add + * @param[in] event The Event Loop event to trigger on + * @param[in] callback The callback to call on event + * @param context The context for callback + */ +void furi_event_loop_message_queue_subscribe( + FuriEventLoop* instance, + FuriMessageQueue* message_queue, + FuriEventLoopEvent event, + FuriEventLoopMessageQueueCallback callback, + void* context); + +/** Unsubscribe from message queue + * + * @param instance The Event Loop instance + * @param message_queue The message queue + */ +void furi_event_loop_message_queue_unsubscribe( + FuriEventLoop* instance, + FuriMessageQueue* message_queue); + +#ifdef __cplusplus +} +#endif diff --git a/furi/core/event_loop_i.h b/furi/core/event_loop_i.h new file mode 100644 index 0000000000..8ddd10966f --- /dev/null +++ b/furi/core/event_loop_i.h @@ -0,0 +1,33 @@ +#pragma once + +#include "event_loop.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct FuriEventLoopItem FuriEventLoopItem; + +/* Link between Event Loop */ + +typedef struct { + FuriEventLoopItem* item_in; + FuriEventLoopItem* item_out; +} FuriEventLoopLink; + +void furi_event_loop_link_notify(FuriEventLoopLink* instance, FuriEventLoopEvent event); + +/* Contract between event loop and an object */ + +typedef FuriEventLoopLink* (*FuriEventLoopContractGetLink)(void* object); + +typedef uint32_t (*FuriEventLoopContractGetLevel)(void* object, FuriEventLoopEvent event); + +typedef struct { + const FuriEventLoopContractGetLink get_link; + const FuriEventLoopContractGetLevel get_level; +} FuriEventLoopContract; + +#ifdef __cplusplus +} +#endif diff --git a/furi/core/message_queue.c b/furi/core/message_queue.c index 0454e289b5..cda775abe9 100644 --- a/furi/core/message_queue.c +++ b/furi/core/message_queue.c @@ -1,9 +1,4 @@ -#include "kernel.h" -#include "message_queue.h" -#include "check.h" - -#include -#include +#include "message_queue_i.h" // Internal FreeRTOS member names #define uxMessagesWaiting uxDummy4[0] @@ -12,6 +7,10 @@ struct FuriMessageQueue { StaticQueue_t container; + + // Event Loop Link + FuriEventLoopLink event_loop_link; + uint8_t buffer[]; }; @@ -42,6 +41,10 @@ void furi_message_queue_free(FuriMessageQueue* instance) { furi_check(furi_kernel_is_irq_or_masked() == 0U); furi_check(instance); + // Event Loop must be disconnected + furi_check(!instance->event_loop_link.item_in); + furi_check(!instance->event_loop_link.item_out); + vQueueDelete((QueueHandle_t)instance); free(instance); } @@ -82,6 +85,11 @@ FuriStatus } } + if(stat == FuriStatusOk) { + furi_event_loop_link_notify(&instance->event_loop_link, FuriEventLoopEventIn); + } + + /* Return execution status */ return stat; } @@ -120,6 +128,10 @@ FuriStatus furi_message_queue_get(FuriMessageQueue* instance, void* msg_ptr, uin } } + if(stat == FuriStatusOk) { + furi_event_loop_link_notify(&instance->event_loop_link, FuriEventLoopEventOut); + } + return stat; } @@ -182,5 +194,34 @@ FuriStatus furi_message_queue_reset(FuriMessageQueue* instance) { (void)xQueueReset(hQueue); } + if(stat == FuriStatusOk) { + furi_event_loop_link_notify(&instance->event_loop_link, FuriEventLoopEventOut); + } + + /* Return execution status */ return stat; } + +static FuriEventLoopLink* furi_message_queue_event_loop_get_link(void* object) { + FuriMessageQueue* instance = object; + furi_assert(instance); + return &instance->event_loop_link; +} + +static uint32_t furi_message_queue_event_loop_get_level(void* object, FuriEventLoopEvent event) { + FuriMessageQueue* instance = object; + furi_assert(instance); + + if(event == FuriEventLoopEventIn) { + return furi_message_queue_get_count(instance); + } else if(event == FuriEventLoopEventOut) { + return furi_message_queue_get_space(instance); + } else { + furi_crash(); + } +} + +const FuriEventLoopContract furi_message_queue_event_loop_contract = { + .get_link = furi_message_queue_event_loop_get_link, + .get_level = furi_message_queue_event_loop_get_level, +}; diff --git a/furi/core/message_queue_i.h b/furi/core/message_queue_i.h new file mode 100644 index 0000000000..aa24cfe54f --- /dev/null +++ b/furi/core/message_queue_i.h @@ -0,0 +1,12 @@ +#pragma once + +#include "message_queue.h" + +#include "kernel.h" +#include "event_loop_i.h" +#include "check.h" + +#include +#include + +extern const FuriEventLoopContract furi_message_queue_event_loop_contract; \ No newline at end of file diff --git a/furi/core/stream_buffer.c b/furi/core/stream_buffer.c index eefda0e790..879520010a 100644 --- a/furi/core/stream_buffer.c +++ b/furi/core/stream_buffer.c @@ -1,6 +1,6 @@ -#include "base.h" -#include "check.h" #include "stream_buffer.h" + +#include "check.h" #include "common_defines.h" #include diff --git a/furi/core/stream_buffer.h b/furi/core/stream_buffer.h index 3cc9c1b670..eef8ee5107 100644 --- a/furi/core/stream_buffer.h +++ b/furi/core/stream_buffer.h @@ -12,8 +12,8 @@ * interrupt that will read from the buffer (the reader). */ #pragma once -#include -#include + +#include "base.h" #ifdef __cplusplus extern "C" { diff --git a/furi/furi.h b/furi/furi.h index d8aec91c0f..24e597acfe 100644 --- a/furi/furi.h +++ b/furi/furi.h @@ -4,6 +4,7 @@ #include "core/check.h" #include "core/common_defines.h" +#include "core/event_loop.h" #include "core/event_flag.h" #include "core/kernel.h" #include "core/log.h" diff --git a/targets/f18/api_symbols.csv b/targets/f18/api_symbols.csv index 91999f7a0a..d732e991db 100644 --- a/targets/f18/api_symbols.csv +++ b/targets/f18/api_symbols.csv @@ -1098,6 +1098,13 @@ Function,+,furi_event_flag_free,void,FuriEventFlag* Function,+,furi_event_flag_get,uint32_t,FuriEventFlag* Function,+,furi_event_flag_set,uint32_t,"FuriEventFlag*, uint32_t" Function,+,furi_event_flag_wait,uint32_t,"FuriEventFlag*, uint32_t, uint32_t, uint32_t" +Function,-,furi_event_loop_alloc,FuriEventLoop*, +Function,-,furi_event_loop_free,void,FuriEventLoop* +Function,-,furi_event_loop_message_queue_subscribe,void,"FuriEventLoop*, FuriMessageQueue*, FuriEventLoopEvent, FuriEventLoopMessageQueueCallback, void*" +Function,-,furi_event_loop_message_queue_unsubscribe,void,"FuriEventLoop*, FuriMessageQueue*" +Function,-,furi_event_loop_run,void,FuriEventLoop* +Function,-,furi_event_loop_stop,void,FuriEventLoop* +Function,-,furi_event_loop_tick_set,void,"FuriEventLoop*, uint32_t, FuriEventLoopTickCallback, void*" Function,+,furi_get_tick,uint32_t, Function,+,furi_hal_adc_acquire,FuriHalAdcHandle*, Function,+,furi_hal_adc_configure,void,FuriHalAdcHandle* @@ -2696,6 +2703,7 @@ Function,+,view_dispatcher_alloc,ViewDispatcher*, Function,+,view_dispatcher_attach_to_gui,void,"ViewDispatcher*, Gui*, ViewDispatcherType" Function,+,view_dispatcher_enable_queue,void,ViewDispatcher* Function,+,view_dispatcher_free,void,ViewDispatcher* +Function,-,view_dispatcher_get_event_loop,FuriEventLoop*,ViewDispatcher* Function,+,view_dispatcher_remove_view,void,"ViewDispatcher*, uint32_t" Function,+,view_dispatcher_run,void,ViewDispatcher* Function,+,view_dispatcher_send_custom_event,void,"ViewDispatcher*, uint32_t" diff --git a/targets/f7/api_symbols.csv b/targets/f7/api_symbols.csv index 32160b56e0..2686cf1bf3 100644 --- a/targets/f7/api_symbols.csv +++ b/targets/f7/api_symbols.csv @@ -1201,6 +1201,13 @@ Function,+,furi_event_flag_free,void,FuriEventFlag* Function,+,furi_event_flag_get,uint32_t,FuriEventFlag* Function,+,furi_event_flag_set,uint32_t,"FuriEventFlag*, uint32_t" Function,+,furi_event_flag_wait,uint32_t,"FuriEventFlag*, uint32_t, uint32_t, uint32_t" +Function,-,furi_event_loop_alloc,FuriEventLoop*, +Function,-,furi_event_loop_free,void,FuriEventLoop* +Function,-,furi_event_loop_message_queue_subscribe,void,"FuriEventLoop*, FuriMessageQueue*, FuriEventLoopEvent, FuriEventLoopMessageQueueCallback, void*" +Function,-,furi_event_loop_message_queue_unsubscribe,void,"FuriEventLoop*, FuriMessageQueue*" +Function,-,furi_event_loop_run,void,FuriEventLoop* +Function,-,furi_event_loop_stop,void,FuriEventLoop* +Function,-,furi_event_loop_tick_set,void,"FuriEventLoop*, uint32_t, FuriEventLoopTickCallback, void*" Function,+,furi_get_tick,uint32_t, Function,+,furi_hal_adc_acquire,FuriHalAdcHandle*, Function,+,furi_hal_adc_configure,void,FuriHalAdcHandle* @@ -3515,6 +3522,7 @@ Function,+,view_dispatcher_alloc,ViewDispatcher*, Function,+,view_dispatcher_attach_to_gui,void,"ViewDispatcher*, Gui*, ViewDispatcherType" Function,+,view_dispatcher_enable_queue,void,ViewDispatcher* Function,+,view_dispatcher_free,void,ViewDispatcher* +Function,-,view_dispatcher_get_event_loop,FuriEventLoop*,ViewDispatcher* Function,+,view_dispatcher_remove_view,void,"ViewDispatcher*, uint32_t" Function,+,view_dispatcher_run,void,ViewDispatcher* Function,+,view_dispatcher_send_custom_event,void,"ViewDispatcher*, uint32_t" diff --git a/targets/f7/inc/FreeRTOSConfig.h b/targets/f7/inc/FreeRTOSConfig.h index 0abce558fc..e6233f624a 100644 --- a/targets/f7/inc/FreeRTOSConfig.h +++ b/targets/f7/inc/FreeRTOSConfig.h @@ -86,8 +86,12 @@ to exclude the API function. */ #define INCLUDE_xTaskGetSchedulerState 1 #define INCLUDE_xTimerPendFunctionCall 1 -/* Furi-specific */ -#define configTASK_NOTIFICATION_ARRAY_ENTRIES 2 +/* Workaround for various notification issues: + * - First one used by system primitives + * - Second one by thread event notification + * - Third one by FuriEventLoop + */ +#define configTASK_NOTIFICATION_ARRAY_ENTRIES 3 extern __attribute__((__noreturn__)) void furi_thread_catch(void); #define configTASK_RETURN_ADDRESS (furi_thread_catch + 2)