hydro_deploy/
gcp.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::sync::{Arc, Mutex, OnceLock};
5
6use anyhow::Result;
7use nanoid::nanoid;
8use serde_json::json;
9use tokio::sync::RwLock;
10
11use super::terraform::{TERRAFORM_ALPHABET, TerraformOutput, TerraformProvider};
12use super::{ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult};
13use crate::ssh::LaunchedSshHost;
14use crate::{BaseServerStrategy, HostStrategyGetter, PortNetworkHint};
15
16pub struct LaunchedComputeEngine {
17    resource_result: Arc<ResourceResult>,
18    user: String,
19    pub internal_ip: String,
20    pub external_ip: Option<String>,
21}
22
23impl LaunchedSshHost for LaunchedComputeEngine {
24    fn get_external_ip(&self) -> Option<String> {
25        self.external_ip.clone()
26    }
27
28    fn get_internal_ip(&self) -> String {
29        self.internal_ip.clone()
30    }
31
32    fn get_cloud_provider(&self) -> String {
33        "GCP".to_string()
34    }
35
36    fn resource_result(&self) -> &Arc<ResourceResult> {
37        &self.resource_result
38    }
39
40    fn ssh_user(&self) -> &str {
41        self.user.as_str()
42    }
43}
44
45#[derive(Debug)]
46pub struct GcpNetwork {
47    pub project: String,
48    pub existing_vpc: Option<String>,
49    id: String,
50}
51
52impl GcpNetwork {
53    pub fn new(project: impl Into<String>, existing_vpc: Option<String>) -> Self {
54        Self {
55            project: project.into(),
56            existing_vpc,
57            id: nanoid!(8, &TERRAFORM_ALPHABET),
58        }
59    }
60
61    fn collect_resources(&mut self, resource_batch: &mut ResourceBatch) -> String {
62        resource_batch
63            .terraform
64            .terraform
65            .required_providers
66            .insert(
67                "google".to_string(),
68                TerraformProvider {
69                    source: "hashicorp/google".to_string(),
70                    version: "4.53.1".to_string(),
71                },
72            );
73
74        let vpc_network = format!("hydro-vpc-network-{}", self.id);
75
76        if let Some(existing) = self.existing_vpc.as_ref() {
77            if resource_batch
78                .terraform
79                .resource
80                .get("google_compute_network")
81                .unwrap_or(&HashMap::new())
82                .contains_key(existing)
83            {
84                format!("google_compute_network.{existing}")
85            } else {
86                resource_batch
87                    .terraform
88                    .data
89                    .entry("google_compute_network".to_string())
90                    .or_default()
91                    .insert(
92                        vpc_network.clone(),
93                        json!({
94                            "name": existing,
95                            "project": self.project,
96                        }),
97                    );
98
99                format!("data.google_compute_network.{vpc_network}")
100            }
101        } else {
102            resource_batch
103                .terraform
104                .resource
105                .entry("google_compute_network".to_string())
106                .or_default()
107                .insert(
108                    vpc_network.clone(),
109                    json!({
110                        "name": vpc_network,
111                        "project": self.project,
112                        "auto_create_subnetworks": true
113                    }),
114                );
115
116            let firewall_entries = resource_batch
117                .terraform
118                .resource
119                .entry("google_compute_firewall".to_string())
120                .or_default();
121
122            // allow all VMs to communicate with each other over internal IPs
123            firewall_entries.insert(
124                format!("{vpc_network}-default-allow-internal"),
125                json!({
126                    "name": format!("{vpc_network}-default-allow-internal"),
127                    "project": self.project,
128                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
129                    "source_ranges": ["10.128.0.0/9"],
130                    "allow": [
131                        {
132                            "protocol": "tcp",
133                            "ports": ["0-65535"]
134                        },
135                        {
136                            "protocol": "udp",
137                            "ports": ["0-65535"]
138                        },
139                        {
140                            "protocol": "icmp"
141                        }
142                    ]
143                }),
144            );
145
146            // allow external pings to all VMs
147            firewall_entries.insert(
148                format!("{vpc_network}-default-allow-ping"),
149                json!({
150                    "name": format!("{vpc_network}-default-allow-ping"),
151                    "project": self.project,
152                    "network": format!("${{google_compute_network.{vpc_network}.name}}"),
153                    "source_ranges": ["0.0.0.0/0"],
154                    "allow": [
155                        {
156                            "protocol": "icmp"
157                        }
158                    ]
159                }),
160            );
161
162            self.existing_vpc = Some(vpc_network.clone());
163
164            format!("google_compute_network.{vpc_network}")
165        }
166    }
167}
168
169pub struct GcpComputeEngineHost {
170    /// ID from [`crate::Deployment::add_host`].
171    id: usize,
172
173    project: String,
174    machine_type: String,
175    image: String,
176    target_type: HostTargetType,
177    region: String,
178    network: Arc<RwLock<GcpNetwork>>,
179    user: Option<String>,
180    display_name: Option<String>,
181    pub launched: OnceLock<Arc<LaunchedComputeEngine>>, // TODO(mingwei): fix pub
182    external_ports: Mutex<Vec<u16>>,
183}
184
185impl Debug for GcpComputeEngineHost {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.write_fmt(format_args!(
188            "GcpComputeEngineHost({} ({:?}))",
189            self.id, &self.display_name
190        ))
191    }
192}
193
194impl GcpComputeEngineHost {
195    #[expect(clippy::too_many_arguments, reason = "used via builder pattern")]
196    pub fn new(
197        id: usize,
198        project: impl Into<String>,
199        machine_type: impl Into<String>,
200        image: impl Into<String>,
201        target_type: HostTargetType,
202        region: impl Into<String>,
203        network: Arc<RwLock<GcpNetwork>>,
204        user: Option<String>,
205        display_name: Option<String>,
206    ) -> Self {
207        Self {
208            id,
209            project: project.into(),
210            machine_type: machine_type.into(),
211            image: image.into(),
212            target_type,
213            region: region.into(),
214            network,
215            user,
216            display_name,
217            launched: OnceLock::new(),
218            external_ports: Mutex::new(Vec::new()),
219        }
220    }
221}
222
223impl Host for GcpComputeEngineHost {
224    fn target_type(&self) -> HostTargetType {
225        self.target_type
226    }
227
228    fn request_port_base(&self, bind_type: &BaseServerStrategy) {
229        match bind_type {
230            BaseServerStrategy::UnixSocket => {}
231            BaseServerStrategy::InternalTcpPort(_) => {}
232            BaseServerStrategy::ExternalTcpPort(port) => {
233                let mut external_ports = self.external_ports.lock().unwrap();
234                if !external_ports.contains(port) {
235                    if self.launched.get().is_some() {
236                        todo!("Cannot adjust firewall after host has been launched");
237                    }
238                    external_ports.push(*port);
239                }
240            }
241        }
242    }
243
244    fn request_custom_binary(&self) {
245        self.request_port_base(&BaseServerStrategy::ExternalTcpPort(22));
246    }
247
248    fn id(&self) -> usize {
249        self.id
250    }
251
252    fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
253        if self.launched.get().is_some() {
254            return;
255        }
256
257        let vpc_path = self
258            .network
259            .try_write()
260            .unwrap()
261            .collect_resources(resource_batch);
262
263        let project = self.project.as_str();
264
265        // first, we import the providers we need
266        resource_batch
267            .terraform
268            .terraform
269            .required_providers
270            .insert(
271                "google".to_string(),
272                TerraformProvider {
273                    source: "hashicorp/google".to_string(),
274                    version: "4.53.1".to_string(),
275                },
276            );
277
278        resource_batch
279            .terraform
280            .terraform
281            .required_providers
282            .insert(
283                "local".to_string(),
284                TerraformProvider {
285                    source: "hashicorp/local".to_string(),
286                    version: "2.3.0".to_string(),
287                },
288            );
289
290        resource_batch
291            .terraform
292            .terraform
293            .required_providers
294            .insert(
295                "tls".to_string(),
296                TerraformProvider {
297                    source: "hashicorp/tls".to_string(),
298                    version: "4.0.4".to_string(),
299                },
300            );
301
302        // we use a single SSH key for all VMs
303        resource_batch
304            .terraform
305            .resource
306            .entry("tls_private_key".to_string())
307            .or_default()
308            .insert(
309                "vm_instance_ssh_key".to_string(),
310                json!({
311                    "algorithm": "RSA",
312                    "rsa_bits": 4096
313                }),
314            );
315
316        resource_batch
317            .terraform
318            .resource
319            .entry("local_file".to_string())
320            .or_default()
321            .insert(
322                "vm_instance_ssh_key_pem".to_string(),
323                json!({
324                    "content": "${tls_private_key.vm_instance_ssh_key.private_key_pem}",
325                    "filename": ".ssh/vm_instance_ssh_key_pem",
326                    "file_permission": "0600"
327                }),
328            );
329
330        let vm_key = format!("vm-instance-{}", self.id);
331        let mut vm_name = format!("hydro-vm-instance-{}", nanoid!(8, &TERRAFORM_ALPHABET),);
332        // Name must match regex: (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?), max length = 63 (61 + 1 a-z before and after)
333        if let Some(mut display_name) = self.display_name.clone() {
334            vm_name.push('-');
335            display_name = display_name
336                .replace("_", "-")
337                .replace(":", "-")
338                .to_lowercase();
339
340            // Keep the latter half of display_name if it is too long
341            let num_chars_to_cut = vm_name.len() + display_name.len() - 63;
342            if num_chars_to_cut > 0 {
343                display_name.drain(0..num_chars_to_cut);
344            }
345            vm_name.push_str(&display_name);
346        }
347
348        let mut tags = vec![];
349        let mut external_interfaces = vec![];
350
351        let external_ports = self.external_ports.lock().unwrap();
352        if external_ports.is_empty() {
353            external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") }));
354        } else {
355            external_interfaces.push(json!({
356                "network": format!("${{{vpc_path}.self_link}}"),
357                "access_config": [
358                    {
359                        "network_tier": "STANDARD"
360                    }
361                ]
362            }));
363
364            // open the external ports that were requested
365            let my_external_tags = external_ports.iter().map(|port| {
366                let rule_id = nanoid!(8, &TERRAFORM_ALPHABET);
367                let firewall_rule = resource_batch
368                    .terraform
369                    .resource
370                    .entry("google_compute_firewall".to_string())
371                    .or_default()
372                    .entry(format!("open-external-port-{}", port))
373                    .or_insert(json!({
374                        "name": format!("open-external-port-{}-{}", port, rule_id),
375                        "project": project,
376                        "network": format!("${{{vpc_path}.name}}"),
377                        "target_tags": [format!("open-external-port-tag-{}-{}", port, rule_id)],
378                        "source_ranges": ["0.0.0.0/0"],
379                        "allow": [
380                            {
381                                "protocol": "tcp",
382                                "ports": vec![port.to_string()]
383                            }
384                        ]
385                    }));
386
387                firewall_rule["target_tags"].as_array().unwrap()[0].clone()
388            });
389
390            tags.extend(my_external_tags);
391
392            resource_batch.terraform.output.insert(
393                format!("{vm_key}-public-ip"),
394                TerraformOutput {
395                    value: format!("${{google_compute_instance.{vm_key}.network_interface[0].access_config[0].nat_ip}}")
396                }
397            );
398        }
399        drop(external_ports); // Drop the lock as soon as possible.
400
401        let user = self.user.as_deref().unwrap_or("hydro");
402        resource_batch
403            .terraform
404            .resource
405            .entry("google_compute_instance".to_string())
406            .or_default()
407            .insert(
408                vm_key.clone(),
409                json!({
410                    "name": vm_name,
411                    "project": project,
412                    "machine_type": self.machine_type,
413                    "zone": self.region,
414                    "tags": tags,
415                    "metadata": {
416                        "ssh-keys": format!("{user}:${{tls_private_key.vm_instance_ssh_key.public_key_openssh}}")
417                    },
418                    "boot_disk": [
419                        {
420                            "initialize_params": [
421                                {
422                                    "image": self.image
423                                }
424                            ]
425                        }
426                    ],
427                    "network_interface": external_interfaces,
428                }),
429            );
430
431        resource_batch.terraform.output.insert(
432            format!("{vm_key}-internal-ip"),
433            TerraformOutput {
434                value: format!(
435                    "${{google_compute_instance.{vm_key}.network_interface[0].network_ip}}"
436                ),
437            },
438        );
439    }
440
441    fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
442        self.launched
443            .get()
444            .map(|a| a.clone() as Arc<dyn LaunchedHost>)
445    }
446
447    fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
448        self.launched
449            .get_or_init(|| {
450                let id = self.id;
451
452                let internal_ip = resource_result
453                    .terraform
454                    .outputs
455                    .get(&format!("vm-instance-{id}-internal-ip"))
456                    .unwrap()
457                    .value
458                    .clone();
459
460                let external_ip = resource_result
461                    .terraform
462                    .outputs
463                    .get(&format!("vm-instance-{id}-public-ip"))
464                    .map(|v| v.value.clone());
465
466                Arc::new(LaunchedComputeEngine {
467                    resource_result: resource_result.clone(),
468                    user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
469                    internal_ip,
470                    external_ip,
471                })
472            })
473            .clone()
474    }
475
476    fn strategy_as_server<'a>(
477        &'a self,
478        client_host: &dyn Host,
479        network_hint: PortNetworkHint,
480    ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
481        if matches!(network_hint, PortNetworkHint::Auto)
482            && client_host.can_connect_to(ClientStrategy::UnixSocket(self.id))
483        {
484            Ok((
485                ClientStrategy::UnixSocket(self.id),
486                Box::new(|_| BaseServerStrategy::UnixSocket),
487            ))
488        } else if matches!(
489            network_hint,
490            PortNetworkHint::Auto | PortNetworkHint::TcpPort(_)
491        ) && client_host.can_connect_to(ClientStrategy::InternalTcpPort(self))
492        {
493            Ok((
494                ClientStrategy::InternalTcpPort(self),
495                Box::new(move |_| {
496                    BaseServerStrategy::InternalTcpPort(match network_hint {
497                        PortNetworkHint::Auto => None,
498                        PortNetworkHint::TcpPort(port) => port,
499                    })
500                }),
501            ))
502        } else if matches!(network_hint, PortNetworkHint::Auto)
503            && client_host.can_connect_to(ClientStrategy::ForwardedTcpPort(self))
504        {
505            Ok((
506                ClientStrategy::ForwardedTcpPort(self),
507                Box::new(|me| {
508                    me.downcast_ref::<GcpComputeEngineHost>()
509                        .unwrap()
510                        .request_port_base(&BaseServerStrategy::ExternalTcpPort(22)); // needed to forward
511                    BaseServerStrategy::InternalTcpPort(None)
512                }),
513            ))
514        } else {
515            anyhow::bail!("Could not find a strategy to connect to GCP instance")
516        }
517    }
518
519    fn can_connect_to(&self, typ: ClientStrategy) -> bool {
520        match typ {
521            ClientStrategy::UnixSocket(id) => {
522                #[cfg(unix)]
523                {
524                    self.id == id
525                }
526
527                #[cfg(not(unix))]
528                {
529                    let _ = id;
530                    false
531                }
532            }
533            ClientStrategy::InternalTcpPort(target_host) => {
534                if let Some(gcp_target) =
535                    <dyn Any>::downcast_ref::<GcpComputeEngineHost>(target_host)
536                {
537                    self.project == gcp_target.project
538                } else {
539                    false
540                }
541            }
542            ClientStrategy::ForwardedTcpPort(_) => false,
543        }
544    }
545}