Skip to content

Commit

Permalink
feat: add discovery for better debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
Wkkkkk authored and Kun Wu committed Jun 25, 2023
1 parent 3edaa79 commit 9091a52
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 71 deletions.
56 changes: 35 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ tracing-actix-web = "0.7"
secrecy = { version = "0.8", features = ["serde"] }
futures = "0.3.28"
gstreamer = "0.20.5"
gstreamer-pbutils = "0.20.5"
clap = { version = "4.3.0", features = ["derive"] }
actix-cors = "0.6.4"
gstreamer-pbutils = "0.20.5"
serde_json = "1.0.61"
derive_more = "0.99.17"
toml = "0.7.5"

[dev-dependencies]
once_cell = "1.7.2"
claims = "0.7.0"
wiremock = "0.5"
reqwest = { version = "0.11", features = ["json"] }
linkify = "0.9"
29 changes: 29 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fs;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct DiscoverConfig {
pub enable_discoverer: bool,
pub discoverer_timeout_sec: u64,
}

impl DiscoverConfig {
pub fn load(path: &str) -> Result<Self, Box<dyn Error>> {
let config = fs::read_to_string(path)?;
let config: Self = toml::from_str(&config)?;
Ok(config)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn server_config_should_be_loaded() {
let result: Result<DiscoverConfig, toml::de::Error> =
toml::from_str(include_str!("discover.conf"));
assert!(result.is_ok());
}
}
2 changes: 2 additions & 0 deletions src/discover.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable_discoverer = true
discoverer_timeout_sec = 15
8 changes: 6 additions & 2 deletions src/domain/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,13 @@ impl SharableAppState {

for (id, conn) in connections.iter_mut() {
if conn.whep_offer.is_none() {
conn.whip_offer = Some(offer);
if conn.whip_offer.is_none() {
conn.whip_offer = Some(offer);

return Ok(id.clone());
return Ok(id.clone());
} else {
return Err(MyError::RepeatedResourceIdError(id.clone()));
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
pub mod domain;
pub mod pipeline;
pub mod routes;
pub mod startup;
pub mod stream;
pub mod telemetry;
18 changes: 11 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use clap::Parser;
use srt_whep::config::DiscoverConfig;
use srt_whep::domain::SharableAppState;
use srt_whep::pipeline::{Args, SharablePipeline};
use srt_whep::startup::run;
use srt_whep::stream::{Args, SharablePipeline};
use srt_whep::telemetry::{get_subscriber, init_subscriber};
use std::error::Error;
use std::net::TcpListener;
use tokio::signal;
use tokio::task;

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let config: DiscoverConfig = toml::from_str(include_str!("discover.conf"))?;

let subscriber = get_subscriber("srt_whep".into(), "info".into(), std::io::stdout);
init_subscriber(subscriber);
Expand All @@ -19,10 +22,10 @@ async fn main() -> std::io::Result<()> {
let listener =
TcpListener::bind(format!("0.0.0.0:{}", args.port)).expect("Whep port is already in use");

let p2 = pipeline_data.clone();
let pipeline_clone = pipeline_data.clone();
// Run the pipeline in a separate thread
let t = task::spawn(async move {
if let Err(error) = p2.setup_pipeline(&args) {
let pipeline_thread = task::spawn(async move {
if let Err(error) = pipeline_clone.setup_pipeline(&args, &config) {
tracing::error!("Failed to setup pipeline: {}", error);
}
});
Expand All @@ -31,11 +34,12 @@ async fn main() -> std::io::Result<()> {
run(listener, app_data, pipeline_data.clone())?.await?;

signal::ctrl_c().await?;
tracing::info!("Received Ctrl-C signal");
tracing::debug!("Received Ctrl-C signal");
// Stop the pipeline
if let Err(error) = pipeline_data.close_pipeline() {
tracing::error!("Failed to close pipeline: {}", error);
}
t.abort();
pipeline_thread.abort();

Ok(())
}
2 changes: 1 addition & 1 deletion src/routes/remove_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::domain::*;
use crate::pipeline::SharablePipeline;
use crate::stream::SharablePipeline;

use actix_web::{web, HttpResponse};
use anyhow::Context;
Expand Down
14 changes: 7 additions & 7 deletions src/routes/whep_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::domain::*;
use crate::pipeline::SharablePipeline;
use crate::stream::SharablePipeline;
use actix_web::{web, HttpResponse};
use anyhow::Context;
use chrono::Utc;
Expand All @@ -21,28 +21,28 @@ pub async fn subscribe(
)));
}

let resource_id = Uuid::new_v4().to_string();
let connection_id = Uuid::new_v4().to_string();
tracing::info!(
"Create connection {} at time: {:?}",
resource_id.clone(),
connection_id.clone(),
Utc::now()
);

pipeline_state
.add_client(resource_id.clone())
.add_client(connection_id.clone())
.context("Failed to add client")?;

app_state
.add_resource(resource_id.clone())
.add_resource(connection_id.clone())
.context("Failed to add resource")?;

let sdp = app_state
.wait_on_whip_offer(resource_id.clone())
.wait_on_whip_offer(connection_id.clone())
.await
.context("Failed to receive a whip offer")?;

let whep_port = app_state.get_port();
let url = format!("http://localhost:{}/channel/{}", whep_port, resource_id);
let url = format!("http://localhost:{}/channel/{}", whep_port, connection_id);
tracing::info!("Receiving streaming from: {}", url);

Ok(HttpResponse::Created()
Expand Down
6 changes: 3 additions & 3 deletions src/routes/whip_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ pub async fn whip_request(
)));
}

let resource_id = app_state
let connection_id = app_state
.save_whip_offer(sdp)
.context("Failed to save whip offer")?;

let whip_answer = app_state
.wait_on_whep_offer(resource_id.clone())
.wait_on_whep_offer(connection_id.clone())
.await
.context("Failed to receive a whep offer")?;

let whep_port = app_state.get_port();
let url = format!("http://localhost:{}/channel/{}", whep_port, resource_id);
let url = format!("http://localhost:{}/channel/{}", whep_port, connection_id);
tracing::info!("Start streaming at: {}", url);

Ok(HttpResponse::Ok()
Expand Down
2 changes: 1 addition & 1 deletion src/startup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::domain::SharableAppState;
use crate::pipeline::SharablePipeline;
use crate::routes::{health_check, list, patch, remove_connection, subscribe, whip_request};
use crate::stream::SharablePipeline;
use actix_cors::Cors;
use actix_web::dev::Server;
use actix_web::{web, App, HttpServer};
Expand Down
Loading

0 comments on commit 9091a52

Please sign in to comment.