1
mirror of https://github.com/DarkFlippers/unleashed-firmware.git synced 2025-12-12 04:34:43 +04:00

Merge remote-tracking branch 'OFW/dev' into dev

This commit is contained in:
MX
2024-12-24 14:04:07 +03:00
10 changed files with 756 additions and 6 deletions

View File

@@ -30,6 +30,7 @@ env.Append(
File("stream/string_stream.h"),
File("stream/buffered_file_stream.h"),
File("strint.h"),
File("pipe.h"),
File("protocols/protocol_dict.h"),
File("pretty_format.h"),
File("hex.h"),

241
lib/toolbox/pipe.c Normal file
View File

@@ -0,0 +1,241 @@
#include "pipe.h"
#include <furi.h>
/**
* Data shared between both sides.
*/
typedef struct {
FuriSemaphore* instance_count; // <! 1 = both sides, 0 = only one side
FuriMutex* state_transition;
} PipeShared;
/**
* There are two PipeSides per pipe.
*/
struct PipeSide {
PipeRole role;
PipeShared* shared;
FuriStreamBuffer* sending;
FuriStreamBuffer* receiving;
FuriEventLoop* event_loop;
void* callback_context;
PipeSideDataArrivedCallback on_data_arrived;
PipeSideSpaceFreedCallback on_space_freed;
PipeSideBrokenCallback on_pipe_broken;
};
PipeSideBundle pipe_alloc(size_t capacity, size_t trigger_level) {
PipeSideReceiveSettings settings = {
.capacity = capacity,
.trigger_level = trigger_level,
};
return pipe_alloc_ex(settings, settings);
}
PipeSideBundle pipe_alloc_ex(PipeSideReceiveSettings alice, PipeSideReceiveSettings bob) {
// the underlying primitives are shared
FuriStreamBuffer* alice_to_bob = furi_stream_buffer_alloc(bob.capacity, bob.trigger_level);
FuriStreamBuffer* bob_to_alice = furi_stream_buffer_alloc(alice.capacity, alice.trigger_level);
PipeShared* shared = malloc(sizeof(PipeShared));
*shared = (PipeShared){
.instance_count = furi_semaphore_alloc(1, 1),
.state_transition = furi_mutex_alloc(FuriMutexTypeNormal),
};
PipeSide* alices_side = malloc(sizeof(PipeSide));
PipeSide* bobs_side = malloc(sizeof(PipeSide));
*alices_side = (PipeSide){
.role = PipeRoleAlice,
.shared = shared,
.sending = alice_to_bob,
.receiving = bob_to_alice,
};
*bobs_side = (PipeSide){
.role = PipeRoleBob,
.shared = shared,
.sending = bob_to_alice,
.receiving = alice_to_bob,
};
return (PipeSideBundle){.alices_side = alices_side, .bobs_side = bobs_side};
}
PipeRole pipe_role(PipeSide* pipe) {
furi_check(pipe);
return pipe->role;
}
PipeState pipe_state(PipeSide* pipe) {
furi_check(pipe);
uint32_t count = furi_semaphore_get_count(pipe->shared->instance_count);
return (count == 1) ? PipeStateOpen : PipeStateBroken;
}
void pipe_free(PipeSide* pipe) {
furi_check(pipe);
furi_check(!pipe->event_loop);
furi_mutex_acquire(pipe->shared->state_transition, FuriWaitForever);
FuriStatus status = furi_semaphore_acquire(pipe->shared->instance_count, 0);
if(status == FuriStatusOk) {
// the other side is still intact
furi_mutex_release(pipe->shared->state_transition);
free(pipe);
} else {
// the other side is gone too
furi_stream_buffer_free(pipe->sending);
furi_stream_buffer_free(pipe->receiving);
furi_semaphore_free(pipe->shared->instance_count);
furi_mutex_free(pipe->shared->state_transition);
free(pipe->shared);
free(pipe);
}
}
static void _pipe_stdout_cb(const char* data, size_t size, void* context) {
furi_assert(context);
PipeSide* pipe = context;
while(size) {
size_t sent = pipe_send(pipe, data, size, FuriWaitForever);
data += sent;
size -= sent;
}
}
static size_t _pipe_stdin_cb(char* data, size_t size, FuriWait timeout, void* context) {
furi_assert(context);
PipeSide* pipe = context;
return pipe_receive(pipe, data, size, timeout);
}
void pipe_install_as_stdio(PipeSide* pipe) {
furi_check(pipe);
furi_thread_set_stdout_callback(_pipe_stdout_cb, pipe);
furi_thread_set_stdin_callback(_pipe_stdin_cb, pipe);
}
size_t pipe_receive(PipeSide* pipe, void* data, size_t length, FuriWait timeout) {
furi_check(pipe);
return furi_stream_buffer_receive(pipe->receiving, data, length, timeout);
}
size_t pipe_send(PipeSide* pipe, const void* data, size_t length, FuriWait timeout) {
furi_check(pipe);
return furi_stream_buffer_send(pipe->sending, data, length, timeout);
}
size_t pipe_bytes_available(PipeSide* pipe) {
furi_check(pipe);
return furi_stream_buffer_bytes_available(pipe->receiving);
}
size_t pipe_spaces_available(PipeSide* pipe) {
furi_check(pipe);
return furi_stream_buffer_spaces_available(pipe->sending);
}
static void pipe_receiving_buffer_callback(FuriEventLoopObject* buffer, void* context) {
UNUSED(buffer);
PipeSide* pipe = context;
furi_assert(pipe);
if(pipe->on_space_freed) pipe->on_data_arrived(pipe, pipe->callback_context);
}
static void pipe_sending_buffer_callback(FuriEventLoopObject* buffer, void* context) {
UNUSED(buffer);
PipeSide* pipe = context;
furi_assert(pipe);
if(pipe->on_data_arrived) pipe->on_space_freed(pipe, pipe->callback_context);
}
static void pipe_semaphore_callback(FuriEventLoopObject* semaphore, void* context) {
UNUSED(semaphore);
PipeSide* pipe = context;
furi_assert(pipe);
if(pipe->on_pipe_broken) pipe->on_pipe_broken(pipe, pipe->callback_context);
}
void pipe_attach_to_event_loop(PipeSide* pipe, FuriEventLoop* event_loop) {
furi_check(pipe);
furi_check(event_loop);
furi_check(!pipe->event_loop);
pipe->event_loop = event_loop;
}
void pipe_detach_from_event_loop(PipeSide* pipe) {
furi_check(pipe);
furi_check(pipe->event_loop);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->receiving);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->sending);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->shared->instance_count);
pipe->event_loop = NULL;
}
void pipe_set_callback_context(PipeSide* pipe, void* context) {
furi_check(pipe);
pipe->callback_context = context;
}
void pipe_set_data_arrived_callback(
PipeSide* pipe,
PipeSideDataArrivedCallback callback,
FuriEventLoopEvent event) {
furi_check(pipe);
furi_check(pipe->event_loop);
furi_check((event & FuriEventLoopEventMask) == 0);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->receiving);
pipe->on_data_arrived = callback;
if(callback)
furi_event_loop_subscribe_stream_buffer(
pipe->event_loop,
pipe->receiving,
FuriEventLoopEventIn | event,
pipe_receiving_buffer_callback,
pipe);
}
void pipe_set_space_freed_callback(
PipeSide* pipe,
PipeSideSpaceFreedCallback callback,
FuriEventLoopEvent event) {
furi_check(pipe);
furi_check(pipe->event_loop);
furi_check((event & FuriEventLoopEventMask) == 0);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->sending);
pipe->on_space_freed = callback;
if(callback)
furi_event_loop_subscribe_stream_buffer(
pipe->event_loop,
pipe->sending,
FuriEventLoopEventOut | event,
pipe_sending_buffer_callback,
pipe);
}
void pipe_set_broken_callback(
PipeSide* pipe,
PipeSideBrokenCallback callback,
FuriEventLoopEvent event) {
furi_check(pipe);
furi_check(pipe->event_loop);
furi_check((event & FuriEventLoopEventMask) == 0);
furi_event_loop_maybe_unsubscribe(pipe->event_loop, pipe->shared->instance_count);
pipe->on_pipe_broken = callback;
if(callback)
furi_event_loop_subscribe_semaphore(
pipe->event_loop,
pipe->shared->instance_count,
FuriEventLoopEventOut | event,
pipe_semaphore_callback,
pipe);
}

295
lib/toolbox/pipe.h Normal file
View File

@@ -0,0 +1,295 @@
/**
* @file pipe.h
* Pipe convenience module
*
* Pipes are used to send bytes between two threads in both directions. The two
* threads are referred to as Alice and Bob and their abilities regarding what
* they can do with the pipe are equal.
*
* It is also possible to use both sides of the pipe within one thread.
*/
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <furi.h>
#include <stddef.h>
/**
* @brief The role of a pipe side
*
* Both roles are equal, as they can both read and write the data. This status
* might be helpful in determining the role of a thread w.r.t. another thread in
* an application that builds on the pipe.
*/
typedef enum {
PipeRoleAlice,
PipeRoleBob,
} PipeRole;
/**
* @brief The state of a pipe
*
* - `PipeStateOpen`: Both pipe sides are in place, meaning data that is sent
* down the pipe _might_ be read by the peer, and new data sent by the peer
* _might_ arrive.
* - `PipeStateBroken`: The other side of the pipe has been freed, meaning
* data that is written will never reach its destination, and no new data
* will appear in the buffer.
*
* A broken pipe can never become open again, because there's no way to connect
* a side of a pipe to another side of a pipe.
*/
typedef enum {
PipeStateOpen,
PipeStateBroken,
} PipeState;
typedef struct PipeSide PipeSide;
typedef struct {
PipeSide* alices_side;
PipeSide* bobs_side;
} PipeSideBundle;
typedef struct {
size_t capacity;
size_t trigger_level;
} PipeSideReceiveSettings;
/**
* @brief Allocates two connected sides of one pipe.
*
* Creating a pair of sides using this function is the only way to connect two
* pipe sides together. Two unrelated orphaned sides may never be connected back
* together.
*
* The capacity and trigger level for both directions are the same when the pipe
* is created using this function. Use `pipe_alloc_ex` if you want more
* control.
*
* @param capacity Maximum number of bytes buffered in one direction
* @param trigger_level Number of bytes that need to be available in the buffer
* in order for a blocked thread to unblock
* @returns Bundle with both sides of the pipe
*/
PipeSideBundle pipe_alloc(size_t capacity, size_t trigger_level);
/**
* @brief Allocates two connected sides of one pipe.
*
* Creating a pair of sides using this function is the only way to connect two
* pipe sides together. Two unrelated orphaned sides may never be connected back
* together.
*
* The capacity and trigger level may be different for the two directions when
* the pipe is created using this function. Use `pipe_alloc` if you don't
* need control this fine.
*
* @param alice `capacity` and `trigger_level` settings for Alice's receiving
* buffer
* @param bob `capacity` and `trigger_level` settings for Bob's receiving buffer
* @returns Bundle with both sides of the pipe
*/
PipeSideBundle pipe_alloc_ex(PipeSideReceiveSettings alice, PipeSideReceiveSettings bob);
/**
* @brief Gets the role of a pipe side.
*
* The roles (Alice and Bob) are equal, as both can send and receive data. This
* status might be helpful in determining the role of a thread w.r.t. another
* thread.
*
* @param [in] pipe Pipe side to query
* @returns Role of provided pipe side
*/
PipeRole pipe_role(PipeSide* pipe);
/**
* @brief Gets the state of a pipe.
*
* When the state is `PipeStateOpen`, both sides are active and may send or
* receive data. When the state is `PipeStateBroken`, only one side is active
* (the one that this method has been called on). If you find yourself in that
* state, the data that you send will never be heard by anyone, and the data you
* receive are leftovers in the buffer.
*
* @param [in] pipe Pipe side to query
* @returns State of the pipe
*/
PipeState pipe_state(PipeSide* pipe);
/**
* @brief Frees a side of a pipe.
*
* When only one of the sides is freed, the pipe is transitioned from the "Open"
* state into the "Broken" state. When both sides are freed, the underlying data
* structures are freed too.
*
* @param [in] pipe Pipe side to free
*/
void pipe_free(PipeSide* pipe);
/**
* @brief Connects the pipe to the `stdin` and `stdout` of the current thread.
*
* After performing this operation, you can use `getc`, `puts`, etc. to send and
* receive data to and from the pipe. If the pipe becomes broken, C stdlib calls
* will return `EOF` wherever possible.
*
* You can disconnect the pipe by manually calling
* `furi_thread_set_stdout_callback` and `furi_thread_set_stdin_callback` with
* `NULL`.
*
* @param [in] pipe Pipe side to connect to the stdio
*/
void pipe_install_as_stdio(PipeSide* pipe);
/**
* @brief Receives data from the pipe.
*
* @param [in] pipe The pipe side to read data out of
* @param [out] data The buffer to fill with data
* @param length Maximum length of data to read
* @param timeout The timeout (in ticks) after which the read operation is
* interrupted
* @returns The number of bytes actually written into the provided buffer
*/
size_t pipe_receive(PipeSide* pipe, void* data, size_t length, FuriWait timeout);
/**
* @brief Sends data into the pipe.
*
* @param [in] pipe The pipe side to send data into
* @param [out] data The buffer to get data from
* @param length Maximum length of data to send
* @param timeout The timeout (in ticks) after which the write operation is
* interrupted
* @returns The number of bytes actually read from the provided buffer
*/
size_t pipe_send(PipeSide* pipe, const void* data, size_t length, FuriWait timeout);
/**
* @brief Determines how many bytes there are in the pipe available to be read.
*
* @param [in] pipe Pipe side to query
* @returns Number of bytes available to be read out from that side of the pipe
*/
size_t pipe_bytes_available(PipeSide* pipe);
/**
* @brief Determines how many space there is in the pipe for data to be written
* into.
*
* @param [in] pipe Pipe side to query
* @returns Number of bytes available to be written into that side of the pipe
*/
size_t pipe_spaces_available(PipeSide* pipe);
/**
* @brief Attaches a `PipeSide` to a `FuriEventLoop`, allowing to attach
* callbacks to the PipeSide.
*
* @param [in] pipe Pipe side to attach to the event loop
* @param [in] event_loop Event loop to attach the pipe side to
*/
void pipe_attach_to_event_loop(PipeSide* pipe, FuriEventLoop* event_loop);
/**
* @brief Detaches a `PipeSide` from the `FuriEventLoop` that it was previously
* attached to.
*
* @param [in] pipe Pipe side to detach to the event loop
*/
void pipe_detach_from_event_loop(PipeSide* pipe);
/**
* @brief Callback for when data arrives to a `PipeSide`.
*
* @param [in] pipe Pipe side that called the callback
* @param [inout] context Custom context
*/
typedef void (*PipeSideDataArrivedCallback)(PipeSide* pipe, void* context);
/**
* @brief Callback for when data is read out of the opposite `PipeSide`.
*
* @param [in] pipe Pipe side that called the callback
* @param [inout] context Custom context
*/
typedef void (*PipeSideSpaceFreedCallback)(PipeSide* pipe, void* context);
/**
* @brief Callback for when the opposite `PipeSide` is freed, making the pipe
* broken.
*
* @param [in] pipe Pipe side that called the callback
* @param [inout] context Custom context
*/
typedef void (*PipeSideBrokenCallback)(PipeSide* pipe, void* context);
/**
* @brief Sets the custom context for all callbacks.
*
* @param [in] pipe Pipe side to set the context of
* @param [inout] context Custom context that will be passed to callbacks
*/
void pipe_set_callback_context(PipeSide* pipe, void* context);
/**
* @brief Sets the callback for when data arrives.
*
* @param [in] pipe Pipe side to assign the callback to
* @param [in] callback Callback to assign to the pipe side. Set to NULL to
* unsubscribe.
* @param [in] event Additional event loop flags (e.g. `Edge`, `Once`, etc.).
* Non-flag values of the enum are not allowed.
*
* @warning Attach the pipe side to an event loop first using
* `pipe_attach_to_event_loop`.
*/
void pipe_set_data_arrived_callback(
PipeSide* pipe,
PipeSideDataArrivedCallback callback,
FuriEventLoopEvent event);
/**
* @brief Sets the callback for when data is read out of the opposite `PipeSide`.
*
* @param [in] pipe Pipe side to assign the callback to
* @param [in] callback Callback to assign to the pipe side. Set to NULL to
* unsubscribe.
* @param [in] event Additional event loop flags (e.g. `Edge`, `Once`, etc.).
* Non-flag values of the enum are not allowed.
*
* @warning Attach the pipe side to an event loop first using
* `pipe_attach_to_event_loop`.
*/
void pipe_set_space_freed_callback(
PipeSide* pipe,
PipeSideSpaceFreedCallback callback,
FuriEventLoopEvent event);
/**
* @brief Sets the callback for when the opposite `PipeSide` is freed, making
* the pipe broken.
*
* @param [in] pipe Pipe side to assign the callback to
* @param [in] callback Callback to assign to the pipe side. Set to NULL to
* unsubscribe.
* @param [in] event Additional event loop flags (e.g. `Edge`, `Once`, etc.).
* Non-flag values of the enum are not allowed.
*
* @warning Attach the pipe side to an event loop first using
* `pipe_attach_to_event_loop`.
*/
void pipe_set_broken_callback(
PipeSide* pipe,
PipeSideBrokenCallback callback,
FuriEventLoopEvent event);
#ifdef __cplusplus
}
#endif