Chapter 4: Condition Filtering¶
PubSubLayer2
Everything in this chapter is specific to PubSubLayer2. LiteLayer2 has no condition evaluation machinery — subscriber wake-ups are driven entirely by queue/resource sentinel signals.
Condition filtering is the mechanism that prevents spurious wakeups. When a process or subscriber specifies a cond, the scheduler evaluates it against every incoming event before resuming the caller. The caller only wakes when the condition passes — or when the timeout expires and None is returned.
The same principle applies as in the rest of DSSim: keep simulation usage simple while retaining rich flexibility in how components and conditions are composed.
There are two fundamentally different ways a process can wait for a
condition. The first uses sim.wait() directly — the process itself must
be subscribed to the source endpoint so events reach it:
┌─────────────────────────┐
┌───────────┐ event │ Process │
│ endpoint ├──────────────────────────────▶ │ │
│ (pub) │ │ ┌───────┐ │
└───────────┘ │ │ cond │ sim.wait() │
│ │ (λ) │ │
subscription required │ └───────┘ │
(sim.observe_pre / consume) └─────────────────────────┘
The cond lives inside the process — the scheduler evaluates it
against every event that arrives. The process must be subscribed to the
endpoint before the wait.
The second approach interposes filters and a circuit between the endpoints and the process. Each filter subscribes independently to its own source endpoint and tracks its own state. The circuit combines the filter states with AND / OR logic and wakes the process only when the overall expression is satisfied:
┌──────────────────┐ ┌─────────────┐
pub_A ──────event────▶│ ┌────┐ filter A │──┐ │ Process │
│ │cond│REEVALUATE│ │ │ │
│ │(λ) │ │ │ │ circuit │
└──┴────┴──────────┘ │ │ .wait() │
│ ╔═══════╗ │ │
┌──────────────────┐ ├─▶║ AND ║────▶│ (no user │
pub_B ──────event────▶│ ┌────┐ filter B │──┤ ╚═══════╝ │ cond — │
│ │cond│ LATCH │ │ (circuit) │ just │
│ │(λ) │ │ │ │ accepts │
└──┴────┴──────────┘ │ │ result) │
│ │ │
┌──────────────────┐ │ └─────────────┘
pub_C ──────event────▶│ ┌────┐ filter C │──┘
│ │cond│ PULSED │
│ │(λ) │ │
└──┴────┴──────────┘
◄─── filters attach ──▶ ◄── circuit ──▶ ◄── process ──▶
to source endpoints evaluates subscribes to
independently, combined circuit's
cond lives inside state wake-up ep
each filter
All condition logic lives inside the filters. The process calls
circuit.wait() which simply blocks until the circuit fires — there is no
additional user-supplied condition at the process level. The process never
subscribes to pub_A, pub_B, or pub_C directly; it only subscribes
to the circuit's internal wake-up endpoint.
This two-level architecture is what makes DSCircuit possible: five
independent asynchronous sources can feed into a single AND expression
without the process needing to manage subscriptions, state, or
re-evaluation manually.
4.1 Condition Types¶
A condition can be any of the following:
Plain value — exact match¶
event = await sim.wait(timeout=5, cond="ready")
# wakes only when the event equals "ready"
The condition is satisfied when event == cond.
Callable — predicate¶
event = await sim.wait(timeout=5, cond=lambda e: isinstance(e, int) and e > 10)
# wakes only when the event is an integer greater than 10
Any callable that takes one argument and returns a truthy value is a valid condition. This includes plain functions, methods, and DSFilter objects (which implement __call__).
DSFilter — reusable, named condition¶
ready = sim.filter(cond=lambda e: e.status == "ready")
event = await ready.wait(timeout=10)
DSFilter wraps a condition in a reusable object with additional signal-type control (see Section 4.2). Use filter.wait() to wait on it — see Section 4.5 for details.
DSCircuit — composed condition¶
both_ready = filter_a & filter_b
event = await both_ready.wait(timeout=10)
DSCircuit combines multiple DSFilter objects with AND or OR logic (see Section 4.3). Use circuit.wait() to wait on it — see Section 4.5 for details.
DSFuture / DSProcess — wait for completion¶
A DSFuture or DSProcess used as a condition makes the wait resolve when the future finishes (i.e., .finished() is True).
Every future has an internal publisher that fires a completion event when finish() is called. Two things are required for the wait to work:
- subscription — the waiting process must be subscribed to the future's internal publisher so the completion event reaches it
- condition —
cond=taskgates the wake-up: the process only resumes whentask.finished()isTrue, ignoring any other events that may arrive
The preferred form is task.wait(), which handles both automatically:
task = sim.process(worker_gen()).schedule(0)
result = await task.wait(timeout=10)
# result is None if the task did not finish within 10 time units
The equivalent explicit form using sim.wait() requires a subscription context manager alongside cond=task:
with sim.consume(task):
result = await sim.wait(cond=task, timeout=10)
cond=task alone is not enough — without the context manager, no completion event reaches the waiting process and the wait never resolves. The context manager alone is not enough either — without cond=task, the process wakes on the first event from any subscribed source, not specifically the task's completion.
AlwaysTrue / AlwaysFalse — named sentinels¶
Two named callable constants are provided for clarity in situations where a literal True or False condition is intended:
from dssim import AlwaysTrue, AlwaysFalse
AlwaysTrue — every event satisfies the condition. It is the default cond for all wait() calls, so omitting cond is equivalent to passing AlwaysTrue explicitly:
event = await sim.wait(timeout=5) # same as cond=AlwaysTrue
event = await sim.wait(timeout=5, cond=AlwaysTrue)
AlwaysFalse — no event ever satisfies the condition. The call can only return via timeout, making it a pure time-delay primitive:
await sim.wait(timeout=3, cond=AlwaysFalse) # sleeps exactly 3 time units
This is how sim.sleep(3) is implemented internally. AlwaysFalse is also the default condition for check_and_wait(): the pre-check runs first; if already satisfied the call returns immediately, otherwise it blocks until timeout — no random event will trigger an early wake-up.
None — always true¶
Omitting cond or passing None is equivalent to AlwaysTrue: any event satisfies the condition. The explicit AlwaysTrue form is preferred when the intent should be clear in code.
4.1.1 Subscription is not automatic with sim.wait(cond=...)¶
Common pitfall
Using a condition in sim.wait(cond=...) does not automatically subscribe the process to any publisher. If no events reach the process, the condition is never evaluated and the wait blocks forever — or until timeout.
A frequent mistake is referencing task futures inside a plain lambda condition:
# BROKEN — the process is not subscribed to task1 or task2's completion publishers.
# No completion event ever arrives, so the lambda is never evaluated.
await sim.wait(cond=lambda e: task1.finished() and task2.finished())
The condition is evaluated against each event that is delivered to the process. Without a subscription, no events arrive and the wait hangs indefinitely.
The rule:
| What you call | Subscribes automatically? | Tier | Publisher |
|---|---|---|---|
filter.wait(timeout) |
Yes | PRE | filter attaches to source endpoints; process subscribes to filter's wake-up |
circuit.wait(timeout) |
Yes | PRE | all constituent filters attach to their source endpoints; process subscribes to circuit's wake-up |
task.wait(timeout) |
Yes | PRE | task._finish_tx (completion) |
future.wait(timeout) |
Yes | PRE | future._finish_tx (completion) |
queue.get(timeout) |
Yes | CONSUME | queue.tx_nempty or queue.tx_changed |
queue.put(timeout) |
Yes | CONSUME | queue.tx_nfull |
queue.wait(timeout) |
Yes | CONSUME | queue.tx_nempty or queue.tx_changed |
container.get(timeout) |
Yes | CONSUME | container.tx_nempty or container.tx_changed |
container.wait(timeout) |
Yes | CONSUME | container.tx_nempty or container.tx_changed |
resource.get_n(timeout) |
Yes | CONSUME | resource.tx_nempty |
resource.put_n(timeout) |
Yes | CONSUME | resource.tx_nfull |
stateful.wait(timeout) |
Yes | CONSUME | component.tx_changed |
sim.filter(policy=queue.policy_for_get(...)).check_and_wait(timeout) |
Yes | CONSUME | queue.tx_nempty or queue.tx_changed |
sim.filter(policy=queue.policy_for_put(...)).check_and_wait(timeout) |
Yes | CONSUME | queue.tx_nfull |
sim.filter(policy=queue.policy_for_observe(...)).check_and_wait(timeout) |
Yes | PRE | queue.tx_changed |
sim.filter(policy=resource.policy_for_get(...)).check_and_wait(timeout) |
Yes | CONSUME | resource.tx_nempty |
sim.wait(cond=task) |
Yes — special case: process auto-subscribes when cond is a DSFuture |
PRE | task._finish_tx |
sim.wait(cond=lambda …) |
No — you must subscribe manually | — | — |
sim.wait(cond=value) |
No — you must subscribe manually | — | — |
agent.wait(timeout, cond=…) |
No — delegates to sim.wait |
— | — |
Queue/resource claim policies are one-shot latch policies. If you reuse the same policy-based filter in a loop, reset it between cycles (or build a fresh filter):
f_q = sim.filter(policy=q.policy_for_put())
f_r = sim.filter(policy=r.policy_for_get())
for _ in range(2):
_ = await f_q.check_and_wait(10)
_ = await f_r.check_and_wait(10)
await do_something_else()
f_q.reset()
f_r.reset()
Lambdas are opaque. Even when you wrap a condition inside a DSFilter, the filter only sees a callable — it has no way to know that the lambda references task1 or task2 internally, so it still cannot subscribe to their publishers automatically. This is equally broken:
# STILL BROKEN — the filter cannot introspect the lambda to find task1 or task2
done_a = sim.filter(cond=lambda e: task1.finished(), sigtype=SignalType.LATCH)
done_b = sim.filter(cond=lambda e: task2.finished(), sigtype=SignalType.LATCH)
await (done_a & done_b).wait(timeout=30)
The solution is to pass the tasks directly as the condition. When a DSFilter receives a DSProcess or DSFuture as its cond, it knows exactly which publisher to subscribe to:
# CORRECT — the filter knows task1 and task2 are futures and subscribes to them
done_a = sim.filter(cond=task1, sigtype=SignalType.LATCH)
done_b = sim.filter(cond=task2, sigtype=SignalType.LATCH)
await (done_a & done_b).wait(timeout=30)
Or subscribe manually when using a lambda condition:
# CORRECT — manual subscription covers both tasks
with sim.consume(task1) + sim.consume(task2):
await sim.wait(cond=lambda e: task1.finished() and task2.finished(), timeout=30)
4.2 DSFilter and Signal Types¶
DSFilter is a reusable condition object that subscribes directly to source
endpoints. Use sim.filter() to create one:
f = sim.filter(cond=lambda e: e > 100)
A filter can optionally be bound to specific source publishers via the eps
parameter. The filter subscribes to these endpoints in the PRE phase and
evaluates every incoming event against its condition:
f = sim.filter(cond=lambda e: e > 100, eps=[sensor_pub])
When eps is omitted, the filter extracts endpoints from its cond if
cond implements get_eps() (e.g. DSFuture or DSProcess).
A DSFilter can be passed as the cond of any wait call or composed
into a DSCircuit.
Signal Types¶
The sigtype parameter controls how the filter behaves after it first matches:
| Signal type | Constant | Behaviour |
|---|---|---|
| Monostable | SignalType.LATCH |
Once triggered, stays triggered. Subsequent events do not reset it. |
| Reevaluate | SignalType.REEVALUATE |
Re-evaluates on every event. The signal state can toggle — it reflects whether the latest event passed. |
| Pulsed | SignalType.PULSED |
Triggers momentarily. Never stays in a triggered state; finished() always returns None. |
The tables below use a filter with cond=lambda e: e > 0 and trace the filter's internal state through a sequence of events.
Monostable (SignalType.LATCH) — latches on first match, ignores everything after:
| Event | Condition e > 0 |
Filter state | Wakes waiter? |
|---|---|---|---|
5 |
✓ pass | latched | Yes |
-3 |
✗ fail | latched | — (already resolved) |
2 |
✓ pass | latched | — (already resolved) |
Once latched, the filter's state never changes. A late wait() on an already-latched filter returns immediately.
Reevaluate (SignalType.REEVALUATE) — reflects the current event; state can toggle back and forth:
| Event | Condition e > 0 |
Filter state | Wakes waiter? |
|---|---|---|---|
5 |
✓ pass | high | Yes |
-3 |
✗ fail | low | No — condition no longer met |
2 |
✓ pass | high | Yes — wakes again |
-1 |
✗ fail | low | No |
A process waiting on a reevaluate filter is woken every time the condition transitions from false to true. Useful for level-sensitive readiness conditions that can become un-ready.
Pulsed (SignalType.PULSED) — fires once per matching event, never latches:
| Event | Condition e > 0 |
Filter state | Wakes waiter? |
|---|---|---|---|
-3 |
✗ fail | idle | No |
5 |
✓ pass | pulse → idle | Yes |
2 |
✓ pass | pulse → idle | Yes — wakes again on next match |
-1 |
✗ fail | idle | No |
The filter fires a momentary pulse for each matching event and immediately returns to idle. finished() always returns None — there is no latched state to query.
from dssim.pubsub.cond import DSFilter
SignalType = DSFilter.SignalType
# monostable: once seen, stays True
f_latch = sim.filter(cond=lambda e: e == "done", sigtype=SignalType.LATCH)
# reevaluate: reflects current state
f_level = sim.filter(cond=lambda e: e > 0, sigtype=SignalType.REEVALUATE)
# pulsed: triggers once, never latches
f_pulse = sim.filter(cond=lambda e: e == "tick", sigtype=SignalType.PULSED)
Checking a Filter Directly¶
A DSFilter is callable — it evaluates the condition and returns a bool:
f = sim.filter(cond=lambda e: e > 5)
print(f(3)) # False
print(f(10)) # True
4.3 DSCircuit — Composing Filters¶
DSCircuit combines filters using Python's | (OR) and & (AND) operators. The result is another composable object that can be used anywhere a condition is accepted.
OR — any condition matches¶
f1 = sim.filter(cond=lambda e: e == "A")
f2 = sim.filter(cond=lambda e: e == "B")
either = f1 | f2
event = await either.wait(timeout=5)
# wakes when event is "A" or "B"
AND — all conditions must match¶
channel_ready = sim.filter(cond=lambda e: e.channel_ok)
credit_ok = sim.filter(cond=lambda e: e.credits > 0)
both = channel_ready & credit_ok
event = await both.wait(timeout=10)
# wakes when a single event satisfies both
Chaining¶
Operators chain naturally. Multiple filters of the same kind flatten into a single circuit:
f1 | f2 | f3 # one OR circuit with three arms — not (f1 | f2) | f3
f1 & f2 & f3 # one AND circuit with three arms
Mixed operators follow Python precedence (& binds tighter than |):
f1 | f2 & f3 # equivalent to f1 | (f2 & f3)
Negation — reset semantics¶
Prefixing a filter with - inverts its role. A negated filter acts as a resetter: when its condition matches, it clears the latched state of all positive filters in the circuit, as if none of them had ever fired.
Consider a three-input circuit that resolves only when both A and B have been seen, unless C arrives to cancel the sequence:
seen_a = sim.filter(cond=lambda e: e == "A", sigtype=SignalType.LATCH)
seen_b = sim.filter(cond=lambda e: e == "B", sigtype=SignalType.LATCH)
reset_c = sim.filter(cond=lambda e: e == "C")
ready = seen_a & seen_b & -reset_c
event = await ready.wait(timeout=10)
The event sequence below shows what happens to the circuit state at each step:
| Event | seen_a |
seen_b |
-reset_c |
Circuit resolves? |
|---|---|---|---|---|
"A" |
✓ latched | — | — | No — seen_b not yet seen |
"B" |
✓ latched | ✓ latched | — | Yes — both positive filters satisfied |
If C arrives before both A and B are latched, the circuit resets:
| Event | seen_a |
seen_b |
-reset_c |
Circuit resolves? |
|---|---|---|---|---|
"A" |
✓ latched | — | — | No |
"C" |
✗ cleared | ✗ cleared | fired | No — reset; state lost |
"B" |
— | ✓ latched | — | No — seen_a must be seen again |
"A" |
✓ latched | ✓ latched | — | Yes |
The resetter targets the inner accumulated state of the positive filters — it does not affect the circuit's ability to resolve later once the positive conditions are re-satisfied.
Return value¶
When a DSCircuit resolves, wait returns a dict mapping each signaled
filter to its matched value:
result = await (f_temp & f_pres & f_op).wait(timeout=10)
# result is {f_temp: 179.8, f_pres: 12.1, f_op: 3}
Each value is the actual event that satisfied the corresponding filter — the raw sensor reading, not just True/False. For nested circuits, the dict is flattened: all leaf filters appear at the top level.
Circuit Mode vs Leaf Mode (Loop Semantics)¶
For a common loop pattern:
for cycle in range(2):
payload = await circuit.check_and_wait(timeout=20) # first await
await something_else.wait(timeout=20) # second await
# circuit may be signaled again while blocked in the second await
the next loop-cycle check_and_wait(...) behaves as follows:
Circuit sigtype |
Leaf sigtype |
Circuit one_shot |
Next-cycle first check_and_wait |
Payload returned | Need third signal? |
|---|---|---|---|---|---|
REEVALUATE |
REEVALUATE |
False |
Immediate (already true) | Updated to latest leaf values | No |
REEVALUATE |
LATCH |
False |
Immediate | Stale (leaf values remain latched) | No |
LATCH |
REEVALUATE |
False |
Immediate | Stale (circuit value is latched) | No |
LATCH |
LATCH |
False |
Immediate | Stale (fully latched) | No |
REEVALUATE or LATCH |
any | True |
Often immediate from previously latched/signaled state; circuit detached after first hit so re-signal during step 2 is typically not tracked | Usually stale (first-hit payload) | Usually No (but this is typically not the intended loop semantics) |
one_shot=True is optimized for one-time waits. For looped multi-cycle waits,
prefer one_shot=False; otherwise reset/rebuild between cycles.
If you need per-cycle fresh payloads, use REEVALUATE leaves +
REEVALUATE circuit with one_shot=False, or call reset() between cycles
(or rebuild fresh filters/circuit).
4.4 How Filters Subscribe to Endpoints¶
A DSFilter subscribes directly to its source endpoints in the PRE
phase. This happens automatically when you provide eps at construction
time or when the filter's cond exposes endpoints via get_eps():
f = sim.filter(cond=lambda e: e.type == "DATA", eps=[pub])
The filter sits on the publisher permanently — it receives events even
when no process is currently waiting on it. When f.wait() is called,
the process subscribes to the filter's internal wake-up endpoint and
suspends. The next time the filter matches an event from pub, it
signals its internal endpoint and the waiting process resumes.
This architecture means the filter tracks state independently. A monostable filter latches even before any process starts waiting:
seen_init = sim.filter(cond=lambda e: e == "INIT", sigtype=SignalType.LATCH, eps=[pub])
# ... time passes, other code runs, "INIT" arrives ...
# By the time we wait, "INIT" may have already been seen — check_and_wait handles this
result = await seen_init.check_and_wait(timeout=5)
Lazy vs eager attach¶
By default, endpoint subscriptions are deferred until the first wait() /
gwait() call. Call .attach() when you want explicit eager subscription:
# Subscribes to pub immediately — filter accumulates state from the start
f = sim.filter(cond=lambda e: e == "INIT", eps=[pub]).attach()
Eager (.attach()) |
Lazy (default) | |
|---|---|---|
| Subscription | When .attach() is called |
On first wait() / gwait() |
| State accumulation | Yes — filter state builds from the attach point | Only after the first wait attaches |
| Use when | The filter must track events before any wait | A one-shot conditional wait is sufficient |
One-shot auto-detach¶
By default, a filter automatically detaches from its endpoints after the
first successful match (one_shot=True). This is the right behaviour for
the typical pattern where a fresh filter/circuit is built for each wait:
# Build, wait, done — the circuit detaches itself after firing
result = yield from (f_a & f_b).gwait()
For long-lived filters that must keep tracking state across multiple wait calls, disable auto-detach:
f = sim.filter(cond=lambda e: e > 100, eps=[sensor_pub], one_shot=False)
# First wait
result = await f.wait(timeout=10)
# f is still attached — state continues accumulating
# Second wait
result = await f.wait(timeout=10)
When a filter is part of a circuit (registered as a child via
register_listener), its individual one_shot detach is deferred to the
parent circuit. The filter stays attached to its endpoints until the
circuit fires and calls detach() on the entire tree. This prevents a
PULSED filter from disconnecting after its first pulse while other filters
in the AND have not yet been satisfied.
On a circuit, set_one_shot(one_shot, recursive=True) propagates the
setting to all child filters:
circuit = f_a & f_b & f_c
circuit.set_one_shot(False) # keeps all filters alive after firing
4.5 Waiting on a Filter or Circuit¶
Both DSFilter and DSCircuit expose a .wait() method. This is the preferred way to block on a condition:
result = await my_filter.wait(timeout=10)
result = await my_circuit.wait(timeout=10)
What .wait() does¶
Calling filter.wait(timeout) performs two things:
- Attaches the filter to its source endpoints (if not already attached). The filter subscribes to each source publisher in the PRE phase and evaluates incoming events against its condition.
- Subscribes the calling process to the filter's internal wake-up endpoint and blocks until the filter signals a match.
This two-level architecture means:
- The filter receives events directly from its source publishers — not forwarded through the waiting process.
- The waiting process only wakes when the filter's condition is satisfied.
- Subscription and unsubscription are automatic — no
withblock, no publisher reference needed. - With
one_shot=True(the default), the filter and circuit automatically detach from all endpoints after the first successful match. This makes the common pattern of building a fresh circuit per wait zero-overhead in terms of leftover subscriptions.
Pre-checking before blocking¶
check_and_wait(timeout) checks whether the condition is already satisfied before registering the wait. If it is, the method returns immediately with the cached event — no suspension needed:
# if the filter already fired, returns immediately; otherwise blocks
result = await my_filter.check_and_wait(timeout=10)
This is useful for conditions that may have been satisfied before the current process started watching.
Generator variants¶
For components written as plain generators (not coroutines), use:
result = yield from my_filter.gwait(timeout=10)
result = yield from my_filter.check_and_gwait(timeout=10)
4.6 Advanced: Process and Future Conditions¶
DSFuture and DSProcess objects can be used directly as conditions. This enables patterns where one process waits for another to reach a specific state or produce a result.
Waiting for a Future to resolve¶
A DSFuture is a promise: any process can wait on it, and all waiters wake simultaneously when finish() is called.
async def initializer():
await sim.sleep(3) # simulate startup work
init_done.finish({"port": 8080, "version": 2})
async def server():
config = await sim.wait(cond=init_done)
print(f"t={sim.time}: listening on port {config['port']}")
async def monitor():
config = await sim.wait(cond=init_done) # same future, multiple waiters
print(f"t={sim.time}: monitor saw version {config['version']}")
init_done = sim.future()
sim.process(initializer()).schedule(0)
sim.process(server()).schedule(0)
sim.process(monitor()).schedule(0)
sim.run(until=10)
# t=3: listening on port 8080
# t=3: monitor saw version 2
All processes waiting on the same DSFuture receive the value passed to finish() and resume at the same simulation time. The future is permanently resolved after that — late arrivals that wait(cond=init_done) after finish() wake immediately.
Waiting for a process to finish¶
A DSProcess is itself a DSFuture subtype. Using it as a cond blocks until the process completes and returns its value:
async def worker():
await sim.sleep(5)
return 42
task = sim.process(worker()).schedule(0)
async def master():
result = await sim.wait(cond=task) # blocks until task.finished() is True
print(f"worker returned {result}") # worker returned 42
If the worker was aborted, wait(cond=task) still returns — result will be None and task.exc will hold the exception.
Combining a process condition with a timeout¶
async def master():
result = await sim.wait(timeout=10, cond=task)
if result is None:
print("task did not finish in time")
else:
print(f"task returned {result}")
The timeout and the process condition are independent. Whichever fires first — process completion or timeout expiry — wakes the waiter. None always means timeout.
Using a DSFilter around a process condition¶
A DSFilter can wrap a DSProcess as its condition. The filter signals
when the wrapped process finishes (i.e. process.finished() is True).
The filter subscribes directly to the process's completion endpoint — no
event forwarding is performed:
async def initializer():
await sim.sleep(3)
return {"port": 8080}
task = sim.process(initializer()).schedule(0)
f = sim.filter(cond=task, sigtype=SignalType.LATCH)
result = await f.wait(timeout=20)
# result is the value returned by initializer ({"port": 8080})
4.7 Advanced: Complex Circuits in Practice¶
The following patterns show how to compose filters and circuits for real coordination problems.
Source-scoped filters with eps¶
When a circuit combines filters from independent publishers, each filter
must receive events only from its own source. Bind each filter directly to
explicit source endpoints with eps so unrelated events never trigger
re-evaluation:
temp_pub = DSPub(name='temp', sim=sim)
pres_pub = DSPub(name='pres', sim=sim)
f_temp = sim.filter(cond=lambda e: abs(e - 180) <= 5,
sigtype=SignalType.REEVALUATE,
eps=[temp_pub])
f_pres = sim.filter(cond=lambda e: abs(e - 12) <= 0.5,
sigtype=SignalType.REEVALUATE,
eps=[pres_pub])
ready = f_temp & f_pres
result = await ready.wait(timeout=60)
Each filter subscribes only to its own publisher. When temp_pub signals
179.8, only f_temp receives and re-evaluates it — f_pres keeps its
cached state. Without source scoping, the pressure filter would see 179.8
and evaluate abs(179.8 - 12) <= 0.5 → False, incorrectly resetting its
REEVALUATE state.
You can also pass eps directly to the filter when the condition is a
simple callable:
f_temp = sim.filter(cond=lambda e: abs(e - 180) <= 5,
sigtype=SignalType.REEVALUATE,
eps=[temp_pub])
Pattern 1: Ordered two-phase handshake¶
A monostable filter latches True permanently once it fires. Combining two monostable filters in AND produces a circuit that resolves only after both events have been seen — in any order.
from dssim.pubsub.cond import DSFilter
SignalType = DSFilter.SignalType
req_seen = sim.filter(cond=lambda e: e == "REQUEST", sigtype=SignalType.LATCH)
ack_seen = sim.filter(cond=lambda e: e == "ACK", sigtype=SignalType.LATCH)
handshake = req_seen & ack_seen
result = await handshake.wait(timeout=20)
# resolves as soon as both REQUEST and ACK have been delivered
# (order does not matter)
Pattern 2: Arm / fire / cancel¶
A pulsed filter for the trigger event prevents it from latching while a negated filter resets the armed state on cancellation:
armed = sim.filter(cond=lambda e: e.type == "ARM")
cancel = sim.filter(cond=lambda e: e.type == "CANCEL")
fire = sim.filter(cond=lambda e: e.type == "FIRE", sigtype=SignalType.PULSED)
# Resolves when: ARM has been seen AND FIRE arrives AND CANCEL has not appeared
trigger = armed & fire & -cancel
result = await trigger.wait(timeout=30)
if result is None:
print("timed out — never triggered")
else:
print(f"fired at t={sim.time}")
The -cancel resetter clears armed whenever a CANCEL event arrives, so a CANCEL after ARM but before FIRE prevents the trigger from ever resolving.
Pattern 3: Any error from multiple sources¶
Three independent error conditions collapsed into one OR circuit:
timeout_err = sim.filter(cond=lambda e: e.code == "TIMEOUT")
crc_err = sim.filter(cond=lambda e: e.code == "CRC_FAIL")
overflow = sim.filter(cond=lambda e: e.code == "OVERFLOW")
any_error = timeout_err | crc_err | overflow
fault = await any_error.wait(timeout=100)
if fault is not None:
print(f"fault detected: {fault.code} at t={sim.time}")
Pattern 4: Level-sensitive readiness with reevaluate¶
A REEVALUATE filter tracks the current value of a condition, not whether it was ever true. Use this when a resource can become un-ready after being ready:
link_up = sim.filter(
cond=lambda e: e.link_state == "UP",
sigtype=SignalType.REEVALUATE,
)
credits_ok = sim.filter(
cond=lambda e: e.credits > 0,
sigtype=SignalType.REEVALUATE,
)
can_send = link_up & credits_ok
# Blocks until both conditions are simultaneously true
event = await can_send.wait(timeout=50)
If link_state drops back to "DOWN" while waiting, link_up re-evaluates to False and the circuit goes dark again. The waiter stays blocked until both are true at the same time.
Pattern 5: Sharing a circuit across processes¶
A DSCircuit holds state. If you share a circuit object between two processes, both processes observe the same filter state. This is a useful way to model a shared global readiness condition but requires care: with the default one_shot=True, the circuit detaches after its first successful match, independently of any waiting process.
For shared circuits that must survive across multiple waiters, disable auto-detach and create a fresh circuit once:
ready = chan_ok & cred_ok
ready.set_one_shot(False) # stays attached — multiple processes can wait on it
If you need per-waiter isolation, create a fresh circuit for each waiter:
def make_ready_circuit():
chan_ok = sim.filter(cond=lambda e: e.chan_ok, sigtype=SignalType.LATCH, eps=[pub])
cred_ok = sim.filter(cond=lambda e: e.cred_ok, sigtype=SignalType.LATCH, eps=[pub])
return chan_ok & cred_ok
async def sender(i):
circuit = make_ready_circuit() # each sender gets its own instance
await circuit.wait(timeout=20)
# circuit auto-detaches after firing (one_shot=True by default)
print(f"sender {i} proceeding")
4.8 Key Takeaways¶
- Seven condition types: plain value (exact match), callable (predicate),
AlwaysTrue/AlwaysFalse(named sentinels),DSFilter(reusable),DSCircuit(composed),DSFuture/DSProcess(completion),None(equivalent toAlwaysTrue). DSFiltersignal types: monostable (LATCH) latches forever; reevaluate (REEVALUATE) reflects the current event; pulsed (PULSED) fires once without latching.DSCircuitcomposes filters with|(OR) and&(AND); negated filters (-f) act as resetters that clear positive filter state. The return value is a dict mapping each filter to its matched event.- Filters subscribe directly to source endpoints in the PRE phase. The
epsparameter binds a filter to specific publishers, preventing cross-contamination in circuits with independent sources. - Prefer
await filter.wait(timeout)/await circuit.wait(timeout)— the process subscribes to the filter's internal wake-up endpoint, not to the source publishers directly. - Use
check_and_wait(timeout)when the condition may already be satisfied before the wait is registered — it returns immediately if so. DSFuture.finish()wakes all current and future waiters simultaneously; using aDSProcessas a condition waits for the process to return.- Monostable AND circuits model "both events seen in any order"; reevaluate AND circuits model "both conditions simultaneously true right now".
- Filters and circuits auto-detach after the first match by default (
one_shot=True). Useone_shot=Falseorset_one_shot(False)for long-lived filters that must track state across multiple waits. When a filter is part of a circuit, its auto-detach is deferred to the parent circuit — the circuit detaches the entire tree when it fires. - Per-waiter isolation requires creating a fresh circuit instance per waiter — shared circuit objects share state.