1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39 pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43 fn id(&self) -> LocationId {
44 LocationId::Atomic(Box::new(self.tick.id()))
45 }
46
47 fn flow_state(&self) -> &FlowState {
48 self.tick.flow_state()
49 }
50
51 fn is_top_level() -> bool {
52 L::is_top_level()
53 }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58 L: Location<'a>,
59{
60 type Root = L::Root;
61
62 fn root(&self) -> Self::Root {
63 self.tick.root()
64 }
65
66 fn name() -> String {
67 format!("Atomic<{}>", L::name())
68 }
69}
70
71#[sealed]
72impl<L> NoTick for Atomic<L> {}
73
74pub trait DeferTick {
75 fn defer_tick(self) -> Self;
76}
77
78#[derive(Clone)]
80pub struct Tick<L> {
81 pub(crate) id: usize,
82 pub(crate) l: L,
83}
84
85impl<L: DynLocation> DynLocation for Tick<L> {
86 fn id(&self) -> LocationId {
87 LocationId::Tick(self.id, Box::new(self.l.id()))
88 }
89
90 fn flow_state(&self) -> &FlowState {
91 self.l.flow_state()
92 }
93
94 fn is_top_level() -> bool {
95 false
96 }
97}
98
99impl<'a, L> Location<'a> for Tick<L>
100where
101 L: Location<'a>,
102{
103 type Root = L::Root;
104
105 fn root(&self) -> Self::Root {
106 self.l.root()
107 }
108
109 fn name() -> String {
110 format!("Tick<{}>", L::name())
111 }
112}
113
114impl<'a, L> Tick<L>
115where
116 L: Location<'a>,
117{
118 pub fn outer(&self) -> &L {
119 &self.l
120 }
121
122 pub fn spin_batch(
123 &self,
124 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
125 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
126 where
127 L: NoTick,
128 {
129 let out = self
130 .l
131 .spin()
132 .flat_map_ordered(q!(move |_| 0..batch_size))
133 .map(q!(|_| ()));
134
135 out.batch(self, nondet!())
136 }
137
138 pub fn singleton<T>(
139 &self,
140 e: impl QuotedWithContext<'a, T, Tick<L>>,
141 ) -> Singleton<T, Self, Bounded>
142 where
143 T: Clone,
144 {
145 let e = e.splice_untyped_ctx(self);
146
147 Singleton::new(
148 self.clone(),
149 HydroNode::SingletonSource {
150 value: e.into(),
151 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
152 },
153 )
154 }
155
156 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
175 let e = q!([]);
176 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
177
178 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
179 self.clone(),
180 HydroNode::Source {
181 source: HydroSource::Iter(e.into()),
182 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
183 },
184 );
185
186 unit_optional.map(q!(|_| unreachable!())) }
188
189 pub fn optional_first_tick<T: Clone>(
215 &self,
216 e: impl QuotedWithContext<'a, T, Tick<L>>,
217 ) -> Optional<T, Self, Bounded> {
218 let e_arr = q!([e]);
219 let e = e_arr.splice_untyped_ctx(self);
220
221 Optional::new(
222 self.clone(),
223 HydroNode::Batch {
224 inner: Box::new(HydroNode::Source {
225 source: HydroSource::Iter(e.into()),
226 metadata: self
227 .outer()
228 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
229 }),
230 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
231 },
232 )
233 }
234
235 #[expect(
236 private_bounds,
237 reason = "only Hydro collections can implement ReceiverComplete"
238 )]
239 pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
240 where
241 S: CycleCollection<'a, ForwardRef, Location = Self>,
242 L: NoTick,
243 {
244 let next_id = self.flow_state().borrow_mut().next_cycle_id();
245 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
246
247 (
248 ForwardHandle {
249 completed: false,
250 ident: ident.clone(),
251 expected_location: Location::id(self),
252 _phantom: PhantomData,
253 },
254 S::create_source(ident, self.clone()),
255 )
256 }
257
258 #[expect(
259 private_bounds,
260 reason = "only Hydro collections can implement ReceiverComplete"
261 )]
262 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
263 where
264 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
265 L: NoTick,
266 {
267 let next_id = self.flow_state().borrow_mut().next_cycle_id();
268 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
269
270 (
271 TickCycleHandle {
272 completed: false,
273 ident: ident.clone(),
274 expected_location: Location::id(self),
275 _phantom: PhantomData,
276 },
277 S::create_source(ident, self.clone()).defer_tick(),
278 )
279 }
280
281 #[expect(
282 private_bounds,
283 reason = "only Hydro collections can implement ReceiverComplete"
284 )]
285 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
286 where
287 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
288 {
289 let next_id = self.flow_state().borrow_mut().next_cycle_id();
290 let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
291
292 (
293 TickCycleHandle {
294 completed: false,
295 ident: ident.clone(),
296 expected_location: Location::id(self),
297 _phantom: PhantomData,
298 },
299 S::create_source_with_initial(ident, initial, self.clone()),
301 )
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 #[cfg(feature = "sim")]
308 use stageleft::q;
309
310 #[cfg(feature = "sim")]
311 use crate::live_collections::sliced::sliced;
312 #[cfg(feature = "sim")]
313 use crate::location::Location;
314 #[cfg(feature = "sim")]
315 use crate::nondet::nondet;
316 #[cfg(feature = "sim")]
317 use crate::prelude::FlowBuilder;
318
319 #[cfg(feature = "sim")]
320 #[test]
321 fn sim_atomic_stream() {
322 let flow = FlowBuilder::new();
323 let node = flow.process::<()>();
324
325 let (write_send, write_req) = node.sim_input();
326 let (read_send, read_req) = node.sim_input::<(), _, _>();
327
328 let tick = node.tick();
329 let atomic_write = write_req.atomic(&tick);
330 let current_state = atomic_write.clone().fold(
331 q!(|| 0),
332 q!(|state: &mut i32, v: i32| {
333 *state += v;
334 }),
335 );
336
337 let write_ack_recv = atomic_write.end_atomic().sim_output();
338 let read_response_recv = sliced! {
339 let batch_of_req = use(read_req, nondet!());
340 let latest_singleton = use::atomic(current_state, nondet!());
341 batch_of_req.cross_singleton(latest_singleton)
342 }
343 .sim_output();
344
345 let sim_compiled = flow.sim().compiled();
346 let instances = sim_compiled.exhaustive(async || {
347 write_send.send(1);
348 write_ack_recv.assert_yields([1]).await;
349 read_send.send(());
350 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
351 });
352
353 assert_eq!(instances, 1);
354
355 let instances_read_before_write = sim_compiled.exhaustive(async || {
356 write_send.send(1);
357 read_send.send(());
358 write_ack_recv.assert_yields([1]).await;
359 let _ = read_response_recv.next().await;
360 });
361
362 assert_eq!(instances_read_before_write, 3); }
364
365 #[cfg(feature = "sim")]
366 #[test]
367 #[should_panic]
368 fn sim_non_atomic_stream() {
369 let flow = FlowBuilder::new();
371 let node = flow.process::<()>();
372
373 let (write_send, write_req) = node.sim_input();
374 let (read_send, read_req) = node.sim_input::<(), _, _>();
375
376 let current_state = write_req.clone().fold(
377 q!(|| 0),
378 q!(|state: &mut i32, v: i32| {
379 *state += v;
380 }),
381 );
382
383 let write_ack_recv = write_req.sim_output();
384
385 let read_response_recv = sliced! {
386 let batch_of_req = use(read_req, nondet!());
387 let latest_singleton = use(current_state, nondet!());
388 batch_of_req.cross_singleton(latest_singleton)
389 }
390 .sim_output();
391
392 flow.sim().exhaustive(async || {
393 write_send.send(1);
394 write_ack_recv.assert_yields([1]).await;
395 read_send.send(());
396
397 if let Some((_, v)) = read_response_recv.next().await {
398 assert_eq!(v, 1);
399 }
400 });
401 }
402}