Branch Framework

Keanu

This module provides a typed EventBus implementation and a local ActorSystem for message-based concurrency patterns.

EventBus

The EventBus provides a publish-subscribe messaging system with typed messages and optional topic filtering.

Basic Usage

Extend EventBus[T] for your message type:

object IntEventBus extends EventBus[Int]

Create subscribers by implementing the Subscriber[T] trait or using an anonymous function:

// Using a class
class LoggingSubscriber extends Subscriber[Int] {
  override def onMsg(msg: EventBusMessage[Int]): Unit = 
    println(s"Got message on topic '${msg.topic}': ${msg.payload}")
}

// Using an anonymous function
IntEventBus.subscribe((msg: EventBusMessage[Int]) => 
  println(s"Got message: ${msg.payload}"))

Publishing Messages

// Publish with a topic
IntEventBus.publish("calculations", 42)

// Publish without a topic
IntEventBus.publishNoTopic(42)

Filtered Subscriptions

Subscribe with a filter to only receive specific messages:

// Only receive messages with topic "important"
IntEventBus.subscribe(
  new LoggingSubscriber, 
  msg => msg.topic == "important"
)

Implementation Details

  • Each subscriber gets its own message queue and virtual thread for processing
  • Messages are processed asynchronously but in order for each subscriber
  • Subscriber message handling is wrapped in Try to prevent exceptions from crashing the processing thread
  • Subscribers can be unsubscribed using their UUID or subscriber instance

ActorSystem

The ActorSystem provides local actor-based concurrency with supervision and lifecycle management.

Creating Actors

Define actors by extending the Actor trait:

case class EchoActor() extends Actor {
  override def onMsg: PartialFunction[Any, Any] = {
    case msg => println(s"Echo: $msg")
  }
}

case class CounterActor(actorSystem: ActorSystem) extends Actor {
  private var count = 0
  
  override def onMsg: PartialFunction[Any, Any] = {
    case n: Int =>
      count += n
      actorSystem.tell[EchoActor]("echo", s"Count is now $count")
    case "get" => count
    case "print" => println(s"Count is $count")
  }
}

Setting Up the ActorSystem

// Create a new ActorSystem
val system = ActorSystem()

// Register actor types with their constructor arguments
system.registerProp(ActorProps.props[EchoActor]())
system.registerProp(ActorProps.props[CounterActor](system))

Sending Messages

// Send messages to named actor instances
system.tell[CounterActor]("counter1", 5)
system.tell[CounterActor]("counter1", "get")
system.tell[EchoActor]("echo1", "Hello!")

// Helper for repeated messages to same actor
val counter = system.tell[CounterActor]("counter1", _)
counter(1)
counter(2)
counter("print")

Key Features

  • Actors are uniquely identified by name and type
  • Automatic actor restart on failures
  • Message delivery guarantees within a single actor
  • Graceful shutdown with PoisonPill messages
  • Type-safe actor props system for constructor arguments

Lifecycle Management

The ActorSystem provides several lifecycle events and supervision features:

  • Actors automatically restart on unhandled exceptions
  • PoisonPill message for graceful termination
  • Shutdown coordination with shutdownAwait()
  • Various termination states tracked (initialization failure, interruption, etc.)

To shut down the ActorSystem:

system.shutdownAwait()

This will send PoisonPill to all actors and wait for them to terminate.