zephyrproject-rtos / zephyr

Primary Git Repository for the Zephyr Project. Zephyr is a new generation, scalable, optimized, secure RTOS for multiple hardware architectures.
https://docs.zephyrproject.org
Apache License 2.0
10.71k stars 6.54k forks source link

Static lightweight event publish-subscribe bus #77211

Closed LuoZhongYao closed 1 month ago

LuoZhongYao commented 2 months ago

Introduction

Ebus is a static event publish-subscribe bus, where publishers and subscribers have a many-to-many relationship. Compared to zbus, ebus is lighter.

Problem description

There are many scenarios where zbus is too heavyweight and cumbersome to use, and callback function registration is highly coupled.

Proposed change

Detailed RFC

Use uint32_t id to identify events, id is generated by uint16_t namepsace and uint16_t event, ebus_id = namespace << 16 | event. The namespace defines the namespace, the module registers the namespace using EBUS_NAMESPACE_REGISTER (e.g EBUS_NAMESPACE_REGISTER(power), registers the power namespace), and the module defines the event event in its own namespace as needed. . The user needs to declare the namespace usingEBUS_NAMESPACE_DECLARE, (e.gEBUS_NAMESPACE_DECLARE(power), declare thepowernamespace). Then useEBUS_SUBSCRIBE(namespace, ev, callabck, userdata), to listen forevevents fromnamespace, and when the publisher publishesevevents fromnamespaceusingEBUS_PUBLISH(namespace, ev), the subscriber callsEBUS_PUBLISH(namespace, ev). event, the subscriber callscallback. Sinceebus_idis a constant, a perfect hash table can be generated usinggperf` to speed up the process. The initial design is as follows.

pragma once

if !defined(CONFIG_EBUS)

define EBUS_NAMESPACE_REGISTER(name)

define EBUS_NAMESPACE_DECLARE(name)

define EBUS_PUBLISH(_namespace, _ev, ...)

define EBUS_SUBSCRIBE(_namespace, _ev, _callback, _userdata)

else / CONFIG_EBUS /

include

include

define ebustext attribute((section__(".TEXT.ebus")))

typedef void (ebus_subscriber_t)(uint32_t ebus, void userdata, void *appendix);

struct ebus_subscriber { void *userdata; ebus_subscriber_t subscriber; };

struct ebus_slot_subscriber { uintptr_t ev; const uint8_t namespace; const struct ebus_subscriber subscriber; } ;

struct ebus_publish { void appendix; void (drop)(struct ebus_publish *publish); };

extern uint8_t ebus_namespace_start[]; extern uint8_t __ebus_namespace_end[]; extern const struct ebus_subscriber ebus_subscriber_start[]; extern const struct ebus_subscriber __ebus_subscriber_end[]; extern int ebus_publish(uint32_t ebus, struct ebus_publish *publish);

define EBUS_ORIGINAL(_namespace, _ev) (((uint32_t)(&EBUS_NAMESPACE(_namespace)) << 16) | (_ev))

define EBUS_NAMESPACE(_name) UTIL_CAT(__ebusnamespace, _name)

define EBUS_NAMESPACE_REGISTER(_name) \

const uint8_t __attribute__((__section__(".__ebus_slot_namespace"))) EBUS_NAMESPACE(_name)

define EBUS_NAMESPACE_DECLARE(_name) extern const uint8_t EBUS_NAMESPACE(_name)

define EBUS_PUBLISH(_namespace, _ev, ...) \

ebus_publish(EBUS_ORIGINAL(_namespace, _ev), GET_ARG_N(1, ##__VA_ARGS__, NULL))

define _EBUS_SUBSCRIBE(_namespace, _ev, _subscriber, _userdata, _idx) \

static __attribute__((__section__(".__ebus_subscriber")))                                  \
const struct ebus_subscriber MACRO_MAP_CAT(UTIL_EXPAND, __ebus_, _namespace, _, _ev, _,    \
                       _idx) = {                                       \
    .userdata = _userdata,                                                             \
    .subscriber = _subscriber,                                                         \
};                                                                                         \
                                                                                               \
static __used __attribute__((__section__(".__ebus_slot_subscriber")))                      \
const struct ebus_slot_subscriber                                                          \
MACRO_MAP_CAT(UTIL_EXPAND, __ebus_slot_subscriber_, _namespace, _, _ev, _, _idx) = {       \
    .ev = _ev,                                                                         \
    .namespace = &EBUS_NAMESPACE(_namespace),                                          \
    .subscriber = &MACRO_MAP_CAT(UTIL_EXPAND, __ebus_, _namespace, _, _ev, _, _idx),   \
}

define EBUS_SUBSCRIBE(_namespace, _ev, _subscriber, _userdata) \

_EBUS_SUBSCRIBE(_namespace, _ev, _subscriber, _userdata, __COUNTER__)

endif / CONFIG_EBUS /


* ebus.c
```c
#include <ebus.h>
#include <init.h>
#include <zephyr.h>

struct ebus_item {
    uint32_t ebus;
    struct ebus_publish *publish;
};

K_MSGQ_DEFINE(ebus_msgq, sizeof(struct ebus_item), CONFIG_EBUS_NUM_MSGQ, 4);

int ebus_publish(uint32_t ebus, struct ebus_publish *publish)
{
    struct ebus_item item = {ebus, publish};
    return k_msgq_put(&ebus_msgq, &item, K_NO_WAIT);
}

__ebustext __weak const uint32_t *z_ebus_subscriber_lookup(uintptr_t ebus)
{
    static const uint32_t dummy ={-1};
    return &dummy;
}

static void ebus_issue(void)
{
    struct ebus_item item;

    while (1) {
        void *appendix = NULL;
        int rc = k_msgq_get(&ebus_msgq, &item, K_FOREVER);
        if (rc != 0) {
            continue;
        }
        if (item.publish) {
            appendix = item.publish->appendix;
        }

        for (const uint32_t *subscriber = z_ebus_subscriber_lookup(item.ebus);
             *subscriber != -1; subscriber++) {
            const struct ebus_subscriber *entry = __ebus_subscriber_start + *subscriber;
            entry->subscriber(item.ebus, entry->userdata, appendix);
            if (!IS_ENABLED(CONFIG_EBUS_NO_YIELD)) {
                k_yield();
            }
        }

        if (item.publish && item.publish->drop) {
            item.publish->drop(item.publish);
        }
    }
}

K_THREAD_DEFINE(ebus_thread, CONFIG_EBUS_THREAD_STACK_SIZE, (k_thread_entry_t)ebus_issue, NULL,
        NULL, NULL, CONFIG_EBUS_THREAD_PRIORITY, 0, 0);

SPDX-License-Identifier: Apache-2.0

import sys import argparse import os import struct import pickle from packaging import version

import elftools from elftools.elf.elffile import ELFFile from elftools.elf.sections import SymbolTableSection import elftools.elf.enums

if version.parse(elftools.version) < version.parse('0.24'): sys.exit("pyelftools is out of date, need version 0.24 or later")

scr = os.path.basename(sys.argv[0])

def parse_args(): global args

parser = argparse.ArgumentParser(
    description=__doc__,
    formatter_class=argparse.RawDescriptionHelpFormatter)

parser.add_argument("-k", "--kernel", required=True,
                    help="Input zephyr ELF binary")
parser.add_argument("-o", "--gperf", required=True,
        help="Output list of ebus subscriber for gperf use")

parser.add_argument("-v", "--verbose", action="store_true",
                    help="Print extra debugging information")

args = parser.parse_args()
if "VERBOSE" in os.environ:
    args.verbose = 1

class Slot: def init(self, elf, section): symbols = {sym.name: sym.entry.st_value for sym in section.iter_symbols()} assert "ebus_namespace_start" in symbols , "required ebus_namespace_start symbol" assert "ebus_namespace_end" in symbols , "required ebus_namespace_end symbol" assert "ebus_subscriber_start" in symbols , "required ebus_subscriber_start symbol" assert "ebus_subscriber_end" in symbols , "required ebus_subscriber_end symbol"

    self.elf = elf
    (self.size, self.pointer) = (16, "Q") if "CONFIG_64BIT" in symbols else (8, "I")
    self.slot = (symbols['__ebus_slot_subscriber_start'], symbols['__ebus_slot_subscriber_end'])
    self.namespace = (symbols['__ebus_namespace_start'], symbols['__ebus_namespace_end'])
    self.subscriber = (symbols['__ebus_subscriber_start'], symbols['__ebus_subscriber_end'])

def make(self, sym):
    (event, namespace, subscriber) = self.symbol_handle_data(sym)
    assert self.namespace[0] <= namespace < self.namespace[1], f"{sym.name} Not a valid ebus namespace"
    assert self.subscriber[0] <= subscriber < self.subscriber[1], f"{sym.name} Not a valid ebus subscriber"
    namespace = int((namespace - self.namespace[0]))
    subscriber = int((subscriber - self.subscriber[0]) / self.size)
    return Subscriber(sym, event, namespace, subscriber)

def symbol_data(self, sym):
    addr = sym.entry.st_value
    len = sym.entry.st_size
    for section in self.elf.iter_sections():
        start = section['sh_addr']
        end = start + section['sh_size']
        if (start <= addr) and (addr + len) <= end:
            offset = addr - section['sh_addr']
            return bytes(section.data()[offset:offset + len])

def valid(self, sym):
    return sym.name.startswith("__ebus_slot_subscriber_") and (self.slot[0] <= sym.entry.st_value < self.slot[1])

def symbol_handle_data(self, sym):
    data = self.symbol_data(sym)
    if data:
        return struct.unpack(f"{'<' if self.elf.little_endian else '>'}{self.pointer}{self.pointer}{self.pointer}", data)

class Subscriber: def init(self, sym, event, namespace, subscriber): self.sym = sym self.event = event self.namespace = namespace self.subscriber = subscriber self.ebus = (self.namespace << 16) | event

-- GPERF generation logic

header = """%compare-lengths %define lookup-function-name z_ebus_entry_lookup %readonly-tables %global-table %language=ANSI-C %struct-type %{

include

include

%} struct ebus_entry;

struct ebus_entry { const char name; const uint32_t subscriber; };

"""

Different versions of gperf have different prototypes for the lookup

function, best to implement the wrapper here. The pointer value itself is

turned into a string, we told gperf to expect binary strings that are not

NULL-terminated.

footer = """%% __ebustext const uint32_t z_ebus_subscriber_lookup(uintptr_t ebus) { static const uint32_t dummy = { -1 }; const struct ebus_entry entry = z_ebus_entry_lookup((const char )ebus, sizeof(void )); if (entry) { return entry->subscriber; }

return &dummy;

} """

def write_gperf(fp, subscribers): sber_tbl = "" ebus_tbl = "" fp.write(header) for ebus, sber in subscribers.items(): count = 0 sber_tbl += f"static const uint32_t __ebus_ordsubscriber{ebus:08X}[] = {{" for it in sber: sber_tbl += f"{' ' if count % 16 else '\n\t'}{it.subscriber:d}," count = count + 1

    sber_tbl += f"\n\t-1\n}};\n\n"
    ebus_tbl += f"\"\\x{ebus&0xff:02X}\\x{(ebus >> 8) & 0xff:02X}\\x{(ebus >> 16)  & 0xff:02X}\\x{(ebus >> 24) & 0xff:02X}\", __ebus_ord_subscriber_{ebus:08X}\n"

fp.write(sber_tbl)
fp.write("%%\n")
fp.write(ebus_tbl)
fp.write(footer)

def main(): parse_args()

assert args.kernel, "--kernel ELF required to extract data"
elf = ELFFile(open(args.kernel, "rb"))

subscribers = {}
want_constants = set(["__ebus_subscriber_start",
                      "__ebus_subscriber_end",
                      "__ebus_slot_namespace_start",
                      "__ebus_slot_namespace_end",
                      "__ebus_slot_subscriber_start",
                      "__ebus_slot_subscriber_end"])

for section in elf.iter_sections():
    if not isinstance(section, SymbolTableSection):
        continue

    slot = Slot(elf, section)
    for sym in section.iter_symbols():
        if sym.name in want_constants:
            continue
        if sym.entry.st_info.type != 'STT_OBJECT':
            continue

        if not slot.valid(sym):
                continue

        subscriber = slot.make(sym)
        if not subscriber.ebus in subscribers:
            subscribers[subscriber.ebus] = []
        subscribers[subscriber.ebus].append(subscriber)

if args.gperf:
    with open(args.gperf, "w") as fp:
        write_gperf(fp, subscribers)

if name == "main": main()


* ebus-sections.ld
```ldscript
    SECTION_PROLOGUE(.ebus.namespace, 0 (NOLOAD), AT(_end))
    {
        __ebus_namespace_start = .;
        KEEP(*(SORT(.__ebus_slot_namespace*)));
        __ebus_namespace_end = .;
    }

#ifndef LINKER_PASS2
    SECTION_PROLOGUE(.ebus.subscribe, 0, AT(LOADADDR(.ebus.namespace) + SIZEOF(.ebus.namespace)))
    {
        . = ALIGN(4);
        __ebus_slot_subscriber_start = .;
        KEEP(*(SORT(.__ebus_slot_subscriber*)));
        __ebus_slot_subscriber_end = .;
    }
#else
/DISCARD/ :
{
        KEEP(*(SORT(.__ebus_slot_subscriber*)));
}
#endif
* zephyr/CMakeListst.txt
```cmake

...

if(CONFIG_EBUS)
  add_custom_command(
    OUTPUT ebus_hash.c ebus_hash_preprocessed.c ebus_hash.gperf
    COMMAND ${PYTHON_EXECUTABLE}
            ${ZEPHYR_BASE}/subsys/ebus/gen_ebus.py
            --kernel $<TARGET_FILE:${ZEPHYR_PREBUILT_EXECUTABLE}>
            --gperf ebus_hash.gperf
            $<$<BOOL:${CMAKE_VERBOSE_MAKEFILE}>:--verbose>
    COMMAND ${GPERF}
            --output-file ebus_hash_preprocessed.c
            ebus_hash.gperf
    COMMAND
          ${PYTHON_EXECUTABLE}
          ${ZEPHYR_BASE}/scripts/process_gperf.py
          -i ebus_hash_preprocessed.c
          -o ebus_hash.c
          -p "struct ebus_entry"
          $<$<BOOL:${CMAKE_VERBOSE_MAKEFILE}>:--verbose>
    DEPENDS ${ZEPHYR_PREBUILT_EXECUTABLE} ${ZEPHYR_BASE}/subsys/ebus/gen_ebus.py
    )
  set_property(GLOBAL APPEND PROPERTY GENERATED_KERNEL_SOURCE_FILES ebus_hash.c)
endif()
...

enum { POWER_ON, POWER_OFF, };

/ power.c / EBUS_NAMESPACE_REGISTER(power);

/ subscribe.c /

include "power.h"

static void power_on(uint32_t ebus, void userdata, void appendix) { printk("power on: %s\n", (const char *)appendix); } EBUS_SUBSCRIBE(power, POWER_ON, power_on, NULL);

/ publish.c /

include "power.h"

static struct ebus_publish msg = {"hello", NULL}; void foo(void) { ... EBUS_PUBLISH(power, POWER_ON, &pub); ... }



### Concerns and Unresolved Questions

In `user mode`, the callback runs in kernel space, which is a security risk.
pdgendt commented 2 months ago

Not sure if there's a need for this, and I think zbus can be configured to be light-weight too? @rodrigopex

rodrigopex commented 1 month ago

@pdgendt The lightweight zbus is already there, so use listeners and disable all the unnecessary features.

rodrigopex commented 1 month ago

There are many scenarios where zbus is too heavyweight and cumbersome to use, and callback function registration is highly coupled.

@LuoZhongYao, could you please give us some examples of the problem? If possible, add them to the issue description.

LuoZhongYao commented 1 month ago

There are many scenarios where zbus is too heavyweight and cumbersome to use, and callback function registration is highly coupled.

@LuoZhongYao, could you please give us some examples of the problem? If possible, add them to the issue description.

I compared them carefully, and zbus's LISTENER is very similar to ebus. zbus is based on channels, and ebus is based on events. Maybe ebus is not needed.