Chapter 3: Publisher-Subscriber Principle¶
The publisher/subscriber principle is available in both Layer 2 profiles. A publisher is an output endpoint that fans events out to a list of subscribers. A subscriber is any object that implements send(event).
LiteLayer2 covers the fan-out baseline: every event is delivered to every subscriber in registration order, with no further routing logic (DSLitePub / DSLiteSub).
PubSubLayer2 extends that baseline with routing richness: delivery tiers, condition filtering, and composable circuits. All PubSubLayer2 components — queues, resources, processes — are built on top of these routing primitives.
3.1 Sending Events Directly¶
A publisher is a routing convenience — it is not required. Events can be delivered to any subscriber directly using three simulation-level calls, each with different timing and safety characteristics.
sim.send_object(subscriber, event) — immediate, synchronous¶
Calls subscriber.send(event) in-line before returning. The engine temporarily switches the process context to the receiver and restores it when the call completes.
cb = sim.callback(lambda e: print("got:", e))
sim.send_object(cb, "hello") # prints immediately: got: hello
This is the most performant path — no queue overhead, no extra dispatch cycle. However it is only safe when the receiver does not signal back to the caller during the same call. If the receiver re-enters the caller (directly or through a chain), Python raises ValueError: generator already executing — a cyclic coroutine dependency. Use it only when data flows strictly one way.
sim.signal(event, subscriber) — zero delay, dispatch-safe¶
Enqueues the event onto the now-queue. It is delivered at the current simulation time but in a separate dispatch step, after the current step completes. No cyclic dependency is possible.
proc = sim.process(my_coro()).schedule(0)
sim.signal("hello", proc) # proc receives "hello" at t=sim.time, after this step
Use this when the receiver may signal back, or when strict same-time ordering between independent steps matters.
sim.schedule_event(delay, event, subscriber) — future delivery, dispatch-safe¶
Enqueues the event onto the time queue for delivery at sim.time + delay. The receiver runs in a future dispatch step when time advances to that point.
proc = sim.process(my_coro()).schedule(0)
sim.schedule_event(5, "alarm", proc) # proc receives "alarm" at t=sim.time + 5
Use this for timer-driven wake-ups, timeouts, or any case where the event should arrive after a known delay.
Summary¶
| Method | Delivery time | Dispatch step | Safe if receiver signals back? |
|---|---|---|---|
sim.send_object(sub, event) |
Immediate | Same — synchronous | No — cyclic risk |
sim.signal(event, sub) |
sim.time (now) |
Next, in same time bucket | Yes |
sim.schedule_event(delay, event, sub) |
sim.time + delay |
Future time bucket | Yes |
3.2 Subscriber Types¶
A subscriber is any object that implements send(event). DSSim provides three ready-made subscriber types:
Callback¶
A callback wraps a plain Python callable. send(event) calls the function and returns its return value. The return value matters in the CONSUME tier: a truthy return value means the event was accepted.
cb = sim.callback(lambda e: print("got:", e)) # return value is None → falsy
cb = sim.callback(lambda e: e > 0) # return value is bool → used for consume logic
Callbacks are synchronous — they run to completion during the dispatch step that called them.
Coroutine / Process¶
A process wraps a generator or coroutine. send(event) resumes the suspended coroutine at its current yield / await point, passing the event as the return value of the wait.
async def handler():
event = await sim.wait(timeout=10) # suspended here
print("woken by:", event)
proc = sim.process(handler()).schedule(0)
# When proc.send("ping") is called, the coroutine resumes and prints: woken by: ping
Processes can suspend and resume many times. Each wait or sleep call is a suspension point; the simulation engine calls send() when the next event is due.
ISubscriber / custom subscriber¶
Any object with a send(event) method qualifies. The formal interface is ISubscriber from dssim.base:
from dssim.base import ISubscriber
class MyEndpoint(ISubscriber):
def send(self, event):
... # handle event; return value used in CONSUME tier
This is an open protocol — third-party components, migrated SimPy processes, and hardware model stubs all integrate through the same interface.
Prefer ISubscriber in LiteLayer2
In LiteLayer2, calling sim.callback(fn) wraps a plain function in a
DSLiteCallback just to satisfy the subscriber interface. If you control
the receiving class, implementing ISubscriber directly removes that
translation layer — one fewer object, one fewer indirection per event.
3.3 Publishers and Subscribers¶
A publisher is a named output endpoint that delivers events to all registered subscribers. Use one when you need fan-out, tier routing, or decoupled wiring between components.
flowchart LR
P["Publisher\n(output endpoint)"]
S1["Subscriber A\n(input endpoint)"]
S2["Subscriber B\n(input endpoint)"]
S3["Subscriber C\n(input endpoint)"]
P -->|event| S1
P -->|event| S2
P -->|event| S3
Create a publisher via the simulation factory:
tx = sim.publisher(name="my_component.tx")
Add a subscriber and fire an event:
cb = sim.callback(lambda e: print("received:", e))
tx.add_subscriber(cb)
tx.signal("hello") # prints: received: hello
PubSubLayer2
In PubSubLayer2, add_subscriber accepts an optional delivery tier argument (default is CONSUME):
tx.add_subscriber(cb, tx.Phase.CONSUME)
In LiteLayer2, add_subscriber takes only the subscriber; there are no tiers.
3.4 Component Interfaces¶
Publishers and subscribers are the natural way to define a component's interface. An output endpoint sends events out — in routing terms, this is the publisher. An input endpoint accepts incoming events — in routing terms, this is the subscriber.
The two vocabularies refer to the same objects. Software engineers tend to think in terms of publishers and subscribers — the names used throughout the DSSim API (add_subscriber, DSPub, DSSub). Hardware engineers tend to think in terms of output and input ports, or endpoints — the language of signal interfaces and bus wiring diagrams. Both are correct descriptions of the same connection.
flowchart LR
subgraph A[Component A]
AT["internal logic"]
AO(("tx\n(output endpoint\n/ publisher)"))
AT --> AO
end
subgraph B[Component B]
BI(("rx\n(input endpoint\n/ subscriber)"))
BT["internal logic"]
BI --> BT
end
AO -->|event| BI
To connect two components, register B's subscriber (rx) on A's publisher (tx):
A.tx.add_subscriber(B.rx)
That single line wires the two components together. Every event A sends on tx is delivered to B.rx. Neither component knows about the other's implementation — they only share the endpoint contract.
This composability is the core value of the publisher-subscriber principle: you can connect your component to a third-party component by subscribing to its output endpoint, or expose your own output endpoint so others can subscribe to it, without any adapter code.
3.5 Subscription Tiers¶
PubSubLayer2
Subscription tiers are a PubSubLayer2-specific feature. LiteLayer2's DSLitePub delivers every event to all subscribers unconditionally.
A PubSubLayer2 publisher organizes its subscribers into four ordered phases, called tiers:
| Tier | Who uses it | Payload | Semantics |
|---|---|---|---|
PRE |
Observers, loggers, probes | original event | Always called; cannot consume the event |
CONSUME |
Active consumers | original event | First to accept claims the event; rest are skipped |
POST_HIT |
Reaction logic | {'consumer': <who>, 'event': <original>} |
Called only when a consumer accepted |
POST_MISS |
Fallback logic | original event | Called only when no consumer accepted |
POST_HIT delivers a dict rather than the raw event so the handler knows which subscriber claimed it — useful for tracing which component processed a broadcast and for debugging routing decisions.
The tier split separates concerns cleanly: monitoring in PRE, ownership decision in CONSUME, follow-on reactions in POST_*. Each tier is independent; you can have any combination of subscribers in any tier.
Event Routing Through the Tiers¶
The sequence diagram below shows how a publisher processes two events — one that is consumed and one that is not.
sequenceDiagram
participant P as Publisher
box PRE observers
participant PRE1 as PRE observer #1
participant PRE2 as PRE observer #2
end
box CONSUME consumers
participant C1 as Consumer #1
participant C2 as Consumer #2
participant C3 as Consumer #3
end
participant CEND as (end of consumers)
box POST_HIT observers
participant H1 as POST_HIT #1
participant H2 as POST_HIT #2
end
box POST_MISS observers
participant M1 as POST_MISS #1
participant M2 as POST_MISS #2
end
Note over P: Event A — consumed path
P->>PRE1: event A
PRE1->>PRE2: pass
PRE2->>C1: pass
C1->>C2: reject → pass
C2-->>P: accept (consumed)
P->>H1: POST_HIT(A)
H1->>H2: pass
Note over P: Event B — rejected path
P->>PRE1: event B
PRE1->>PRE2: pass
PRE2->>C1: pass
C1->>C2: reject → pass
C2->>C3: reject → pass
C3->>CEND: reject → pass
CEND-->>P: not consumed
P->>M1: POST_MISS(B)
M1->>M2: pass
Key rules:
- PRE always runs in full, regardless of what happens in CONSUME.
- CONSUME stops at the first accepting subscriber. Later subscribers in the same tier do not see the event.
- POST_HIT runs if and only if at least one consumer accepted.
- POST_MISS runs if and only if no consumer accepted.
Code Example¶
from dssim import DSSimulation
sim = DSSimulation()
pub = sim.publisher(name="pub")
pub.add_subscriber(sim.callback(lambda e: print("PRE:", e)), pub.Phase.PRE)
pub.add_subscriber(sim.callback(lambda e: e == "A"), pub.Phase.CONSUME)
pub.add_subscriber(sim.callback(lambda e: print("POST_HIT:", e)), pub.Phase.POST_HIT)
pub.add_subscriber(sim.callback(lambda e: print("POST_MISS:", e)), pub.Phase.POST_MISS)
sim.schedule_event(0, "A", pub)
sim.schedule_event(1, "B", pub)
sim.run(until=2)
# t=0: PRE: A → POST_HIT: A
# t=1: PRE: B → POST_MISS: B
3.6 Subscription Context Managers¶
PubSubLayer2
Subscription context managers are a PubSubLayer2-specific feature.
Calling add_subscriber / remove_subscriber manually is error-prone when a subscription should only be active for the duration of one operation. PubSubLayer2 provides context managers that subscribe the current process on entry and unsubscribe it automatically on exit — even if an exception occurs.
with sim.consume(pub):
event = await sim.wait(timeout=5)
# process is no longer subscribed to pub after the block
The four context managers correspond directly to the four tiers:
| Context manager | Tier | The process… |
|---|---|---|
sim.observe_pre(pub) |
PRE | observes every event; cannot consume |
sim.consume(pub) |
CONSUME | competes to claim events |
sim.observe_consumed(pub) |
POST_HIT | sees events that were claimed by another consumer |
sim.observe_unconsumed(pub) |
POST_MISS | sees events that no consumer claimed |
Multiple publishers¶
A single context manager can cover several publishers at once:
with sim.consume(pub_a, pub_b):
event = await sim.wait(timeout=10)
# events from either pub_a or pub_b may wake the process
Combining context managers¶
Managers with different tiers can be merged with +:
with sim.observe_pre(monitor_pub) + sim.consume(data_pub):
event = await sim.wait(timeout=10)
The combined manager subscribes and unsubscribes both in a single with block.
Nesting¶
Context managers stack naturally. Each level adds a subscription layer that is torn down when its block exits:
with sim.observe_pre(log_pub):
# subscribed to log_pub (PRE) here
with sim.consume(data_pub):
# also subscribed to data_pub (CONSUME) here
event = await sim.wait(timeout=5)
# data_pub subscription removed; log_pub still active
# log_pub subscription removed
This is particularly useful when outer context tracks a monitoring channel throughout a longer sequence while inner contexts handle individual protocol steps.
DSFuture as a publisher argument¶
A DSFuture (or DSProcess) can be passed in place of a publisher. The context manager resolves the future's internal completion publisher and subscribes to it. This is useful when you want to combine task observation with other subscriptions or use a tier other than the default:
task_a = sim.process(worker_a()).schedule(0)
task_b = sim.process(worker_b()).schedule(0)
with sim.observe_pre(task_a) + sim.observe_pre(task_b) + sim.consume(data_pub):
while True:
event = await sim.wait(timeout=50)
if event is None:
break
if task_a.finished():
print(f"task_a done at t={sim.time}")
if task_b.finished():
print(f"task_b done at t={sim.time}")
For waiting on a single future, task.wait(timeout) is the preferred shorthand — it handles subscription automatically. See Section 4.1 for the full explanation of futures as conditions.
3.7 Advanced: Notifier Policies¶
PubSubLayer2
Notifier policies are a PubSubLayer2-specific feature. LiteLayer2 always notifies subscribers in insertion order.
Within each tier, a publisher holds a notifier — an object that controls the order in which subscribers are visited when an event is dispatched. The notifier is shared across all tiers of the same publisher and is set once at construction:
from dssim import NotifierRoundRobin, NotifierPriority
pub = sim.publisher(name="my.pub", notifier=NotifierRoundRobin)
Three policies are available:
| Policy | Class | Dispatch order |
|---|---|---|
| Insertion order | NotifierDict (default) |
Subscribers are visited in the order they were added. Deterministic, O(1). |
| Round-robin | NotifierRoundRobin |
After each dispatch, the start position rotates to the subscriber after the last one visited. All subscribers get equal opportunity over time. |
| Priority | NotifierPriority |
Subscribers are visited lowest-priority-number first. The priority is given per add_subscriber call. |
NotifierDict — insertion order (default)¶
No configuration needed. Subscribers are always notified in the order they were registered:
pub = sim.publisher(name="pub") # NotifierDict is the default
pub.add_subscriber(sub_a, pub.Phase.CONSUME) # notified first
pub.add_subscriber(sub_b, pub.Phase.CONSUME) # notified second if sub_a rejects
NotifierRoundRobin — fair distribution¶
Useful when multiple identical consumers compete for events and you want each to get an equal share over time. After each dispatch cycle, the internal queue rotates so the next event starts from where the previous one stopped:
from dssim import NotifierRoundRobin
work_pub = sim.publisher(name="work", notifier=NotifierRoundRobin)
for i in range(4):
work_pub.add_subscriber(workers[i], work_pub.Phase.CONSUME)
# Event 1: workers visited starting at index 0 → worker[0] gets first chance
# Event 2: visited starting at index 1 → worker[1] gets first chance
# Event 3: visited starting at index 2 → worker[2] gets first chance
# ...
Without round-robin, worker[0] would always be tried first and would absorb all events as long as it accepts them, starving the others.
NotifierPriority — priority ordering¶
Subscribers are sorted by a numeric priority key; lower numbers are visited first. The priority is passed as an extra argument to add_subscriber:
from dssim import NotifierPriority
alert_pub = sim.publisher(name="alerts", notifier=NotifierPriority)
alert_pub.add_subscriber(critical_handler, alert_pub.Phase.CONSUME, priority=0)
alert_pub.add_subscriber(normal_handler, alert_pub.Phase.CONSUME, priority=10)
alert_pub.add_subscriber(low_handler, alert_pub.Phase.CONSUME, priority=20)
# On each event: critical_handler is tried first, then normal_handler, then low_handler
This is the natural policy for preemptive resources and priority queues, where a higher-priority requester should always get first access.
When a process subscribes via a context manager, the priority keyword passes through to add_subscriber the same way:
async def high_priority_worker():
with sim.consume(alert_pub, priority=0):
event = await sim.wait(timeout=10)
async def low_priority_worker():
with sim.consume(alert_pub, priority=20):
event = await sim.wait(timeout=10)
If both processes are blocked inside their with blocks simultaneously, alert_pub will always try high_priority_worker first.
Choosing a policy¶
| Situation | Policy |
|---|---|
| Single consumer, or strict first-registered wins | NotifierDict (default) |
| Multiple identical workers, load balancing wanted | NotifierRoundRobin |
| Consumers have different priority levels | NotifierPriority |
3.8 Key Takeaways¶
- A publisher fans events out to all registered subscribers; a subscriber is any object with
send(event). - Connecting two components is a single
add_subscribercall on the publisher endpoint — no adapter code needed. - LiteLayer2 (
DSLitePub): simple fan-out, all subscribers receive every event unconditionally. - PubSubLayer2 (
DSPub): four tiers — PRE, CONSUME, POST_HIT, POST_MISS — separate monitoring, ownership, and reaction into distinct roles. - PRE always runs in full; CONSUME stops at the first accepting subscriber; POST phases depend on the consume outcome.
- PubSubLayer2 subscription context managers (
sim.consume,sim.observe_pre,sim.observe_consumed,sim.observe_unconsumed) subscribe the current process for the duration of awithblock and unsubscribe automatically on exit. They can be combined with+and nested freely. - Within each tier, dispatch order is controlled by the publisher's notifier policy:
NotifierDict(insertion order, default),NotifierRoundRobin(rotating start, fair load distribution), orNotifierPriority(lowest number first, set per subscriber).