hydro_lang/location/
tick.rs

1use std::marker::PhantomData;
2
3use proc_macro2::Span;
4use sealed::sealed;
5use stageleft::{QuotedWithContext, q};
6
7#[cfg(stageleft_runtime)]
8use super::dynamic::DynLocation;
9use super::{Cluster, Location, LocationId, Process};
10use crate::compile::builder::FlowState;
11use crate::compile::ir::{HydroNode, HydroSource};
12#[cfg(stageleft_runtime)]
13use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
14use crate::forward_handle::{ForwardHandle, ForwardRef, TickCycle, TickCycleHandle};
15use crate::live_collections::boundedness::{Bounded, Unbounded};
16use crate::live_collections::optional::Optional;
17use crate::live_collections::singleton::Singleton;
18use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
19use crate::nondet::nondet;
20
21#[sealed]
22pub trait NoTick {}
23#[sealed]
24impl<T> NoTick for Process<'_, T> {}
25#[sealed]
26impl<T> NoTick for Cluster<'_, T> {}
27
28#[sealed]
29pub trait NoAtomic {}
30#[sealed]
31impl<T> NoAtomic for Process<'_, T> {}
32#[sealed]
33impl<T> NoAtomic for Cluster<'_, T> {}
34#[sealed]
35impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
36
37#[derive(Clone)]
38pub struct Atomic<Loc> {
39    pub(crate) tick: Tick<Loc>,
40}
41
42impl<L: DynLocation> DynLocation for Atomic<L> {
43    fn id(&self) -> LocationId {
44        LocationId::Atomic(Box::new(self.tick.id()))
45    }
46
47    fn flow_state(&self) -> &FlowState {
48        self.tick.flow_state()
49    }
50
51    fn is_top_level() -> bool {
52        L::is_top_level()
53    }
54}
55
56impl<'a, L> Location<'a> for Atomic<L>
57where
58    L: Location<'a>,
59{
60    type Root = L::Root;
61
62    fn root(&self) -> Self::Root {
63        self.tick.root()
64    }
65
66    fn name() -> String {
67        format!("Atomic<{}>", L::name())
68    }
69}
70
71#[sealed]
72impl<L> NoTick for Atomic<L> {}
73
74pub trait DeferTick {
75    fn defer_tick(self) -> Self;
76}
77
78/// Marks the stream as being inside the single global clock domain.
79#[derive(Clone)]
80pub struct Tick<L> {
81    pub(crate) id: usize,
82    pub(crate) l: L,
83}
84
85impl<L: DynLocation> DynLocation for Tick<L> {
86    fn id(&self) -> LocationId {
87        LocationId::Tick(self.id, Box::new(self.l.id()))
88    }
89
90    fn flow_state(&self) -> &FlowState {
91        self.l.flow_state()
92    }
93
94    fn is_top_level() -> bool {
95        false
96    }
97}
98
99impl<'a, L> Location<'a> for Tick<L>
100where
101    L: Location<'a>,
102{
103    type Root = L::Root;
104
105    fn root(&self) -> Self::Root {
106        self.l.root()
107    }
108
109    fn name() -> String {
110        format!("Tick<{}>", L::name())
111    }
112}
113
114impl<'a, L> Tick<L>
115where
116    L: Location<'a>,
117{
118    pub fn outer(&self) -> &L {
119        &self.l
120    }
121
122    pub fn spin_batch(
123        &self,
124        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
125    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
126    where
127        L: NoTick,
128    {
129        let out = self
130            .l
131            .spin()
132            .flat_map_ordered(q!(move |_| 0..batch_size))
133            .map(q!(|_| ()));
134
135        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
136    }
137
138    pub fn singleton<T>(
139        &self,
140        e: impl QuotedWithContext<'a, T, Tick<L>>,
141    ) -> Singleton<T, Self, Bounded>
142    where
143        T: Clone,
144    {
145        let e = e.splice_untyped_ctx(self);
146
147        Singleton::new(
148            self.clone(),
149            HydroNode::SingletonSource {
150                value: e.into(),
151                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
152            },
153        )
154    }
155
156    /// Creates an [`Optional`] which has a null value on every tick.
157    ///
158    /// # Example
159    /// ```rust
160    /// # #[cfg(feature = "deploy")] {
161    /// # use hydro_lang::prelude::*;
162    /// # use futures::StreamExt;
163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164    /// let tick = process.tick();
165    /// let optional = tick.none::<i32>();
166    /// optional.unwrap_or(tick.singleton(q!(123)))
167    /// # .all_ticks()
168    /// # }, |mut stream| async move {
169    /// // 123
170    /// # assert_eq!(stream.next().await.unwrap(), 123);
171    /// # }));
172    /// # }
173    /// ```
174    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
175        let e = q!([]);
176        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
177
178        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
179            self.clone(),
180            HydroNode::Source {
181                source: HydroSource::Iter(e.into()),
182                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
183            },
184        );
185
186        unit_optional.map(q!(|_| unreachable!())) // always empty
187    }
188
189    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
190    /// null on all subsequent ticks.
191    ///
192    /// This is useful for bootstrapping stateful computations which need an initial value.
193    ///
194    /// # Example
195    /// ```rust
196    /// # #[cfg(feature = "deploy")] {
197    /// # use hydro_lang::prelude::*;
198    /// # use futures::StreamExt;
199    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
200    /// let tick = process.tick();
201    /// // ticks are lazy by default, forces the second tick to run
202    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
203    /// let optional = tick.optional_first_tick(q!(5));
204    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
205    /// # }, |mut stream| async move {
206    /// // 5, 123, 123, 123, ...
207    /// # assert_eq!(stream.next().await.unwrap(), 5);
208    /// # assert_eq!(stream.next().await.unwrap(), 123);
209    /// # assert_eq!(stream.next().await.unwrap(), 123);
210    /// # assert_eq!(stream.next().await.unwrap(), 123);
211    /// # }));
212    /// # }
213    /// ```
214    pub fn optional_first_tick<T: Clone>(
215        &self,
216        e: impl QuotedWithContext<'a, T, Tick<L>>,
217    ) -> Optional<T, Self, Bounded> {
218        let e_arr = q!([e]);
219        let e = e_arr.splice_untyped_ctx(self);
220
221        Optional::new(
222            self.clone(),
223            HydroNode::Batch {
224                inner: Box::new(HydroNode::Source {
225                    source: HydroSource::Iter(e.into()),
226                    metadata: self
227                        .outer()
228                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
229                }),
230                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
231            },
232        )
233    }
234
235    #[expect(
236        private_bounds,
237        reason = "only Hydro collections can implement ReceiverComplete"
238    )]
239    pub fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
240    where
241        S: CycleCollection<'a, ForwardRef, Location = Self>,
242        L: NoTick,
243    {
244        let next_id = self.flow_state().borrow_mut().next_cycle_id();
245        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
246
247        (
248            ForwardHandle {
249                completed: false,
250                ident: ident.clone(),
251                expected_location: Location::id(self),
252                _phantom: PhantomData,
253            },
254            S::create_source(ident, self.clone()),
255        )
256    }
257
258    #[expect(
259        private_bounds,
260        reason = "only Hydro collections can implement ReceiverComplete"
261    )]
262    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
263    where
264        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
265        L: NoTick,
266    {
267        let next_id = self.flow_state().borrow_mut().next_cycle_id();
268        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
269
270        (
271            TickCycleHandle {
272                completed: false,
273                ident: ident.clone(),
274                expected_location: Location::id(self),
275                _phantom: PhantomData,
276            },
277            S::create_source(ident, self.clone()).defer_tick(),
278        )
279    }
280
281    #[expect(
282        private_bounds,
283        reason = "only Hydro collections can implement ReceiverComplete"
284    )]
285    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
286    where
287        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
288    {
289        let next_id = self.flow_state().borrow_mut().next_cycle_id();
290        let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
291
292        (
293            TickCycleHandle {
294                completed: false,
295                ident: ident.clone(),
296                expected_location: Location::id(self),
297                _phantom: PhantomData,
298            },
299            // no need to defer_tick, create_source_with_initial does it for us
300            S::create_source_with_initial(ident, initial, self.clone()),
301        )
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    #[cfg(feature = "sim")]
308    use stageleft::q;
309
310    #[cfg(feature = "sim")]
311    use crate::live_collections::sliced::sliced;
312    #[cfg(feature = "sim")]
313    use crate::location::Location;
314    #[cfg(feature = "sim")]
315    use crate::nondet::nondet;
316    #[cfg(feature = "sim")]
317    use crate::prelude::FlowBuilder;
318
319    #[cfg(feature = "sim")]
320    #[test]
321    fn sim_atomic_stream() {
322        let flow = FlowBuilder::new();
323        let node = flow.process::<()>();
324
325        let (write_send, write_req) = node.sim_input();
326        let (read_send, read_req) = node.sim_input::<(), _, _>();
327
328        let tick = node.tick();
329        let atomic_write = write_req.atomic(&tick);
330        let current_state = atomic_write.clone().fold(
331            q!(|| 0),
332            q!(|state: &mut i32, v: i32| {
333                *state += v;
334            }),
335        );
336
337        let write_ack_recv = atomic_write.end_atomic().sim_output();
338        let read_response_recv = sliced! {
339            let batch_of_req = use(read_req, nondet!(/** test */));
340            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
341            batch_of_req.cross_singleton(latest_singleton)
342        }
343        .sim_output();
344
345        let sim_compiled = flow.sim().compiled();
346        let instances = sim_compiled.exhaustive(async || {
347            write_send.send(1);
348            write_ack_recv.assert_yields([1]).await;
349            read_send.send(());
350            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
351        });
352
353        assert_eq!(instances, 1);
354
355        let instances_read_before_write = sim_compiled.exhaustive(async || {
356            write_send.send(1);
357            read_send.send(());
358            write_ack_recv.assert_yields([1]).await;
359            let _ = read_response_recv.next().await;
360        });
361
362        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
363    }
364
365    #[cfg(feature = "sim")]
366    #[test]
367    #[should_panic]
368    fn sim_non_atomic_stream() {
369        // shows that atomic is necessary
370        let flow = FlowBuilder::new();
371        let node = flow.process::<()>();
372
373        let (write_send, write_req) = node.sim_input();
374        let (read_send, read_req) = node.sim_input::<(), _, _>();
375
376        let current_state = write_req.clone().fold(
377            q!(|| 0),
378            q!(|state: &mut i32, v: i32| {
379                *state += v;
380            }),
381        );
382
383        let write_ack_recv = write_req.sim_output();
384
385        let read_response_recv = sliced! {
386            let batch_of_req = use(read_req, nondet!(/** test */));
387            let latest_singleton = use(current_state, nondet!(/** test */));
388            batch_of_req.cross_singleton(latest_singleton)
389        }
390        .sim_output();
391
392        flow.sim().exhaustive(async || {
393            write_send.send(1);
394            write_ack_recv.assert_yields([1]).await;
395            read_send.send(());
396
397            if let Some((_, v)) = read_response_recv.next().await {
398                assert_eq!(v, 1);
399            }
400        });
401    }
402}