i365dev / event_radar

EventRadar is a lightweight, extensible framework for building event processing pipelines in Elixir. It provides core abstractions and minimal distribution support, allowing applications to implement their own scaling strategies.
https://www.i365.tech/
MIT License
1 stars 0 forks source link

EventRadar #1

Open madawei2699 opened 1 week ago

madawei2699 commented 1 week ago

EventRadar V1 Design Specification

1. Overview

EventRadar is a lightweight, extensible framework for building event monitoring and processing pipelines in Elixir.

1.1 Core Features

1.2 Project Structure

event_radar/
├── .github/
│   └── workflows/
│       ├── ci.yml
│       ├── cd.yml
│       └── docs.yml
│
├── config/
│   ├── config.exs
│   ├── dev.exs
│   ├── prod.exs
│   └── test.exs
│
├── lib/
│   ├── event_radar/
│   │   ├── application.ex
│   │   │
│   │   ├── core/
│   │   │   ├── pipeline.ex         # Pipeline behavior
│   │   │   ├── collector.ex        # Collector behavior
│   │   │   ├── processor.ex        # Processor behavior
│   │   │   └── storage.ex          # Storage behavior
│   │   │
│   │   ├── pipelines/
│   │   │   ├── supervisor.ex       # Pipeline supervisor
│   │   │   ├── registry.ex         # Pipeline registry
│   │   │   ├── builder.ex          # Pipeline builder
│   │   │   └── monitor.ex          # Pipeline monitor
│   │   │
│   │   ├── scheduler/
│   │   │   ├── scheduler.ex        # Task scheduler
│   │   │   ├── dispatcher.ex       # Task dispatcher
│   │   │   └── cron.ex            # Cron scheduler
│   │   │
│   │   ├── collectors/
│   │   │   ├── http.ex            # HTTP collector
│   │   │   ├── file.ex            # File collector
│   │   │   └── stream.ex          # Stream collector
│   │   │
│   │   ├── processors/
│   │   │   ├── filter.ex          # Filter processor
│   │   │   ├── transformer.ex     # Transformer processor
│   │   │   └── batch.ex           # Batch processor
│   │   │
│   │   ├── storage/
│   │   │   ├── memory.ex          # Memory storage
│   │   │   └── disk.ex            # Disk storage
│   │   │
│   │   ├── plugins/
│   │   │   ├── loader.ex          # Plugin loader
│   │   │   └── manager.ex         # Plugin manager
│   │   │
│   │   └── monitoring/
│   │       ├── telemetry.ex       # Telemetry integration
│   │       ├── metrics.ex         # Metrics collection
│   │       └── logger.ex          # Logging utilities
│   │
│   └── event_radar.ex             # Main module
│
├── test/
│   ├── event_radar/
│   │   ├── core/
│   │   ├── pipelines/
│   │   └── scheduler/
│   │
│   ├── support/
│   │   ├── case.ex
│   │   └── factory.ex
│   │
│   └── test_helper.exs
│
├── mix.exs
└── README.md

2. Core Abstractions

2.1 Pipeline

defmodule EventRadar.Core.Pipeline do
  @moduledoc """
  Defines the core pipeline behavior for event processing.
  """

  use GenServer

  @type t :: %__MODULE__{
    id: String.t(),
    collector: collector(),
    processors: [processor()],
    storage: storage(),
    config: config(),
    state: state()
  }

  @callback start_link(config()) :: GenServer.on_start()
  @callback process_event(event()) :: {:ok, event()} | {:error, term()}

  defstruct [:id, :collector, :processors, :storage, :config, :state]

  def start_link(config) do
    GenServer.start_link(__MODULE__, config, name: via_tuple(config.id))
  end

  def init(config) do
    with {:ok, collector} <- init_collector(config.collector),
         {:ok, processors} <- init_processors(config.processors),
         {:ok, storage} <- init_storage(config.storage) do
      {:ok, %__MODULE__{
        id: config.id,
        collector: collector,
        processors: processors,
        storage: storage,
        config: config,
        state: :initialized
      }}
    end
  end
end

2.2 Collector

defmodule EventRadar.Core.Collector do
  @moduledoc """
  Defines the collector behavior for gathering events.
  """

  @type config :: map()
  @type event :: term()

  @callback collect(config()) :: {:ok, [event()]} | {:error, term()}
  @callback validate_config(config()) :: :ok | {:error, term()}

  defmacro __using__(_opts) do
    quote do
      @behaviour EventRadar.Core.Collector

      def validate_config(_config), do: :ok

      defoverridable [validate_config: 1]
    end
  end
end

2.3 Processor

defmodule EventRadar.Core.Processor do
  @moduledoc """
  Defines the processor behavior for handling events.
  """

  @type config :: map()
  @type event :: term()

  @callback process(event(), config()) :: {:ok, event()} | {:error, term()}
  @callback validate_config(config()) :: :ok | {:error, term()}

  defmacro __using__(_opts) do
    quote do
      @behaviour EventRadar.Core.Processor

      def validate_config(_config), do: :ok

      defoverridable [validate_config: 1]
    end
  end
end

3. Pipeline Management

3.1 Registry

defmodule EventRadar.Pipelines.Registry do
  @moduledoc """
  Manages pipeline registration and discovery.
  """

  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def register_pipeline(pipeline_config) do
    GenServer.call(__MODULE__, {:register, pipeline_config})
  end

  def get_pipeline(id) do
    GenServer.call(__MODULE__, {:get, id})
  end

  def list_pipelines do
    GenServer.call(__MODULE__, :list)
  end
end

3.2 Builder

defmodule EventRadar.Pipelines.Builder do
  @moduledoc """
  Builds pipeline instances from configuration.
  """

  def build(config) do
    with :ok <- validate_config(config),
         {:ok, pipeline} <- create_pipeline(config) do
      {:ok, pipeline}
    end
  end

  defp validate_config(config) do
    with :ok <- validate_collector(config.collector),
         :ok <- validate_processors(config.processors),
         :ok <- validate_storage(config.storage) do
      :ok
    end
  end
end

4. Scheduling System

4.1 Task Scheduler

defmodule EventRadar.Scheduler do
  @moduledoc """
  Handles task scheduling and execution.
  """

  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def schedule_task(task_id, schedule) do
    GenServer.call(__MODULE__, {:schedule, task_id, schedule})
  end

  def cancel_task(task_id) do
    GenServer.call(__MODULE__, {:cancel, task_id})
  end
end

5. Plugin System

5.1 Plugin Manager

defmodule EventRadar.Plugins.Manager do
  @moduledoc """
  Manages plugin loading and lifecycle.
  """

  def load_plugin(plugin_module) do
    with :ok <- validate_plugin(plugin_module),
         :ok <- register_plugin(plugin_module) do
      {:ok, plugin_module}
    end
  end

  def unload_plugin(plugin_module) do
    # Unload plugin implementation
  end
end

6. Example Usage

# Define a custom collector
defmodule MyApp.CustomCollector do
  use EventRadar.Core.Collector

  @impl true
  def collect(config) do
    # Implementation
  end
end

# Define a custom processor
defmodule MyApp.CustomProcessor do
  use EventRadar.Core.Processor

  @impl true
  def process(event, config) do
    # Implementation
  end
end

# Create and start a pipeline
config = %{
  id: "my_pipeline",
  collector: {MyApp.CustomCollector, %{url: "https://api.example.com"}},
  processors: [
    {MyApp.CustomProcessor, %{threshold: 0.5}}
  ],
  storage: {EventRadar.Storage.Memory, %{}}
}

{:ok, pipeline} = EventRadar.create_pipeline(config)
EventRadar.start_pipeline(pipeline)

7. Testing Strategy

defmodule EventRadar.Test.Support.Factory do
  use ExMachina

  def pipeline_factory do
    %EventRadar.Core.Pipeline{
      id: sequence(:id, &"pipeline_#{&1}"),
      collector: build(:collector),
      processors: [build(:processor)],
      storage: build(:storage),
      config: %{},
      state: :initialized
    }
  end
end
madawei2699 commented 4 days ago

EventRadar V2 Design Specification

1. Overview

EventRadar is a lightweight, extensible framework for building event processing pipelines in Elixir. It provides core abstractions and minimal distribution support, allowing applications to implement their own scaling strategies.

1.1 Core Features

1.2 Architecture Overview

graph TB
    subgraph EventRadar ["EventRadar Core"]
        direction TB
        subgraph Behaviors ["Core Behaviors"]
            PB[Pipeline]
            CB[Collector]
            PRB[Processor]
            DB[Distribution]
        end

        subgraph Types ["Core Types"]
            E[Event]
            C[Config]
            R[Result]
        end

        subgraph Runtime ["Basic Runtime"]
            PS[PipelineSupervisor]
            PR[PipelineRegistry]
            LD[LocalDistribution]
        end
    end

    subgraph Application ["Application Layer"]
        subgraph Components ["Custom Components"]
            CP[Custom Pipeline]
            CC[Custom Collector]
            CPR[Custom Processor]
            CD[Custom Distribution]
        end

        subgraph AppRuntime ["App Runtime"]
            AS[App Supervisor]
            PF[Pipeline Factory]
        end

        PB --> CP
        CB --> CC
        PRB --> CPR
        DB --> CD

        AS --> PF
        PF --> CP
    end

    classDef core fill:#f9f,stroke:#333,stroke-width:2px
    classDef type fill:#bbf,stroke:#333
    classDef runtime fill:#bfb,stroke:#333
    classDef app fill:#fbb,stroke:#333

1.3 Pipeline Operation Flow

sequenceDiagram
    participant App as Application
    participant ER as EventRadar
    participant D as Distribution
    participant PS as PipelineSupervisor
    participant P as Pipeline
    participant C as Collector
    participant PR as Processor

    %% Pipeline Creation
    rect rgb(200, 223, 255)
        Note over App,PR: Pipeline Creation Flow
        App->>ER: start_pipeline(config, distribution)
        ER->>D: distribute_pipeline(id, config)
        D->>PS: start_pipeline(config)
        PS->>P: start_link(config)
        P->>C: init(collector_config)
        P->>PR: init(processor_config)
    end

    %% Event Processing
    rect rgb(255, 223, 200)
        Note over App,PR: Event Processing Flow
        P->>C: collect()
        C-->>P: {:ok, events}
        loop For each event
            P->>PR: process(event)
            PR-->>P: {:ok, processed_event}
            P->>P: handle_event(processed_event)
        end
    end

1.4 Project Structure

event_radar/
├── lib/
│   ├── event_radar/
│   │   ├── core/
│   │   │   ├── pipeline.ex       # Pipeline behavior
│   │   │   ├── collector.ex      # Collector behavior
│   │   │   ├── processor.ex      # Processor behavior
│   │   │   └── distribution.ex   # Distribution behavior
│   │   │
│   │   ├── types/
│   │   │   ├── event.ex         # Event type
│   │   │   ├── config.ex        # Config type
│   │   │   └── result.ex        # Result type
│   │   │
│   │   ├── runtime/
│   │   │   ├── supervisor.ex    # Pipeline supervisor
│   │   │   ├── registry.ex      # Pipeline registry
│   │   │   └── local.ex         # Local distribution implementation
│   │   │
│   │   └── utils/
│   │       ├── telemetry.ex     # Telemetry integration
│   │       └── logger.ex        # Logging utilities
│   │
│   └── event_radar.ex           # Main module

2. Core Abstractions

2.1 Pipeline

defmodule EventRadar.Core.Pipeline do
  @moduledoc """
  Defines the core behavior for event processing pipelines.
  """

  @callback init(config :: Config.t()) :: {:ok, state :: term()} | {:error, term()}
  @callback handle_event(event :: Event.t(), state :: term()) :: 
    {:ok, Event.t()} | {:error, term()}
end

2.2 Collector

defmodule EventRadar.Core.Collector do
  @moduledoc """
  Defines the behavior for event collectors.
  """

  @callback init(config :: term()) :: {:ok, state :: term()} | {:error, term()}
  @callback collect(state :: term()) :: {:ok, [Event.t()]} | {:error, term()}
end

2.3 Processor

defmodule EventRadar.Core.Processor do
  @moduledoc """
  Defines the behavior for event processors.
  """

  @callback init(config :: term()) :: {:ok, state :: term()} | {:error, term()}
  @callback process(event :: Event.t(), state :: term()) :: 
    {:ok, Event.t()} | {:error, term()}
end

2.4 Distribution

defmodule EventRadar.Core.Distribution do
  @moduledoc """
  Defines the behavior for pipeline distribution.
  """

  @callback distribute_pipeline(pipeline_id :: term(), config :: Config.t()) ::
    {:ok, pid()} | {:error, term()}
  @callback handle_node_change(node :: node(), action :: :up | :down) ::
    :ok | {:error, term()}
end

3. Type System

3.1 Event

defmodule EventRadar.Types.Event do
  @type t :: %__MODULE__{
    id: term(),
    source: term(),
    data: term(),
    metadata: map()
  }

  defstruct [:id, :source, :data, metadata: %{}]
end

3.2 Config

defmodule EventRadar.Types.Config do
  @type collector_config :: {module(), term()}
  @type processor_config :: {module(), term()}

  @type t :: %__MODULE__{
    id: term(),
    collector: collector_config(),
    processors: [processor_config()],
    metadata: map()
  }

  defstruct [:id, :collector, :processors, metadata: %{}]
end

4. Runtime Support

4.1 Pipeline Supervisor

defmodule EventRadar.Runtime.Supervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def init(_init_arg) do
    children = [
      {Registry, keys: :unique, name: EventRadar.Runtime.Registry},
      {DynamicSupervisor, name: EventRadar.Runtime.PipelineSupervisor}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

4.2 Local Distribution

defmodule EventRadar.Runtime.LocalDistribution do
  @behaviour EventRadar.Core.Distribution

  def distribute_pipeline(pipeline_id, config) do
    DynamicSupervisor.start_child(
      EventRadar.Runtime.PipelineSupervisor,
      {config.module, config}
    )
  end

  def handle_node_change(_node, _action), do: :ok
end

5. Application Layer Examples

5.1 Large-Scale Pipeline Management

graph TB
    subgraph Application ["Application Layer"]
        direction TB
        subgraph Distribution ["Custom Distribution"]
            DM[Distribution Manager]
            LB[Load Balancer]
            PH[Pipeline Health Check]
        end

        subgraph Management ["Pipeline Management"]
            PC[Pipeline Config Store]
            PF[Pipeline Factory]
            PM[Pipeline Monitor]
        end

        subgraph Scaling ["Auto Scaling"]
            SS[Scaling Strategy]
            subgraph Partitions
                P1[Partition 1]
                P2[Partition 2]
                P3[Partition N]
            end
        end

        DM --> LB
        DM --> PH
        PF --> PC
        PF --> PM
        SS --> Partitions
    end

5.2 Basic Pipeline Example

defmodule MyApp.Pipeline do
  @behaviour EventRadar.Core.Pipeline

  def init(config) do
    {:ok, config}
  end

  def handle_event(event, state) do
    # Process event
    {:ok, processed_event}
  end
end

# Start pipeline
config = %EventRadar.Types.Config{
  id: "my_pipeline",
  collector: {MyApp.Collector, %{}},
  processors: [{MyApp.Processor, %{}}]
}

EventRadar.start_pipeline(config)

5.3 Distributed Pipeline Example

defmodule MyApp.ClusterDistribution do
  @behaviour EventRadar.Core.Distribution

  def distribute_pipeline(pipeline_id, config) do
    with {:ok, partition} <- PartitionManager.assign_partition(pipeline_id),
         {:ok, node} <- select_node(partition),
         {:ok, pid} <- start_on_node(node, config) do
      {:ok, pid}
    end
  end

  def handle_node_change(node, :down) do
    redistribute_pipelines(node)
  end
end

# Pipeline configuration store
defmodule MyApp.PipelineConfig do
  def store(pipeline_id, config) do
    :ets.insert(@ets_table, {pipeline_id, config})
  end
end

# Partition management
defmodule MyApp.PartitionManager do
  def assign_partition(pipeline_id) do
    partition = :erlang.phash2(pipeline_id, partition_count())
    {:ok, partition}
  end
end

# Health monitoring
defmodule MyApp.PipelineMonitor do
  use GenServer

  def init(state) do
    :timer.send_interval(30_000, :check_health)
    {:ok, state}
  end

  def handle_info(:check_health, state) do
    check_all_pipelines()
    {:noreply, state}
  end
end

5.4 Real-World Example: Twitter Monitor

# Twitter collector
defmodule MyApp.TwitterCollector do
  @behaviour EventRadar.Core.Collector

  def init(config) do
    # Initialize Twitter API client
    {:ok, config}
  end

  def collect(state) do
    # Collect tweets
    {:ok, events}
  end
end

# Tweet processor
defmodule MyApp.TweetProcessor do
  @behaviour EventRadar.Core.Processor

  def process(event, state) do
    # Process tweet
    {:ok, processed_event}
  end
end

# Start monitoring
defmodule MyApp.TwitterMonitor do
  def start_monitoring(user_id, keywords) do
    config = %EventRadar.Types.Config{
      id: "twitter_#{user_id}",
      collector: {MyApp.TwitterCollector, %{
        user_id: user_id,
        keywords: keywords
      }},
      processors: [
        {MyApp.TweetProcessor, %{}}
      ]
    }

    EventRadar.start_pipeline(config, MyApp.ClusterDistribution)
  end
end

6. Testing Support

6.1 Test Helpers

defmodule EventRadar.TestSupport do
  def create_test_pipeline(config \\ %{}) do
    default_config = %EventRadar.Types.Config{
      id: "test_pipeline_#{:rand.uniform(1000)}",
      collector: {TestCollector, %{}},
      processors: []
    }

    config = Map.merge(default_config, config)
    EventRadar.start_pipeline(config)
  end
end

6.2 Example Test

defmodule MyApp.PipelineTest do
  use ExUnit.Case

  test "pipeline processes events correctly" do
    {:ok, pipeline} = EventRadar.TestSupport.create_test_pipeline(%{
      collector: {MyCollector, %{data: test_data}},
      processors: [{MyProcessor, %{}}]
    })

    # Test pipeline behavior
    assert {:ok, _events} = EventRadar.get_processed_events(pipeline)
  end
end

7. Key Design Decisions

  1. Minimal Core Abstractions

    • Focus on essential behaviors
    • Clear interface definitions
    • Type safety
  2. Distribution Support

    • Minimal distribution abstraction
    • Default local implementation
    • Extensible for custom strategies
  3. Application Layer Freedom

    • Custom distribution strategies
    • Flexible scaling approaches
    • Custom monitoring solutions
  4. Runtime Support

    • Basic supervision
    • Pipeline registry
    • Simple process management
  5. Type System

    • Clear event structure
    • Flexible configuration
    • Extensible metadata

The key improvements from V1 include:

  1. Clearer separation between framework and application concerns
  2. Addition of distribution abstraction
  3. Support for large-scale deployments
  4. More flexible pipeline management
  5. Better testing support