Skip to content

Commit

Permalink
dependencies: update to tonic 0.12
Browse files Browse the repository at this point in the history
Pending hyperium/tonic#1740. DNM until its
released.

This lets us drop a few shims
  • Loading branch information
howardjohn committed Jun 21, 2024
1 parent 3a41b09 commit 6d01599
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 336 deletions.
554 changes: 250 additions & 304 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ hickory-client = "0.24"
hickory-proto = "0.24"
hickory-resolver = "0.24"
hickory-server = { version = "0.24", features = [ "hickory-resolver" ] }
http-02 = { package = "http", version = "0.2.9" }
http-body-04 = { package = "http-body", version = "0.4" }
http-body-1 = { package = "http-body", version = "1.0.0-rc.2" }
http-body = { package = "http-body", version = "1" }
http-body-util = "0.1"
http-types = { version = "2.12", default-features = false }
hyper = { version = "1.3", features = ["full"] }
Expand Down Expand Up @@ -88,9 +86,8 @@ tls-listener = { version = "0.10" }
tokio = { version = "1.0", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { version = "0.11", default-features = false, features = ["prost", "codegen"] }
tonic = { git ="https://github.com/hyperium/tonic", default-features = false, features = ["prost", "codegen"] }
tower = { version = "0.4", features = ["full"] }
tower-hyper-http-body-compat = { git = "https://github.com/howardjohn/tower-hyper-http-body-compat", branch = "deps/hyper-1.0.0-snapshot1", features = ["server", "http2"] }
tracing = { version = "0.1"}
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter", "json"] }
url = "2.2"
Expand Down
1 change: 0 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ skip = [

allow-git = [
"https://github.com/janrueth/boring-rustls-provider",
"https://github.com/howardjohn/tower-hyper-http-body-compat"
]
2 changes: 1 addition & 1 deletion src/hyper_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub fn http2_client() -> client::conn::http2::Builder<TokioExecutor> {

pub fn pooling_client<B>() -> ::hyper_util::client::legacy::Client<HttpConnector, B>
where
B: http_body_1::Body + Send,
B: http_body::Body + Send,
B::Data: Send,
{
::hyper_util::client::legacy::Client::builder(::hyper_util::rt::TokioExecutor::new())
Expand Down
4 changes: 1 addition & 3 deletions src/identity/caclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ impl CaClient {
secret_ttl: i64,
) -> Result<CaClient, Error> {
let svc = tls::grpc_connector(address, cert_provider.fetch_cert().await?)?;
// let client = IstioCertificateServiceClient::new(svc);
// let svc =
// tower_hyper_http_body_compat::Hyper1HttpServiceAsTowerService03HttpService::new(svc);

let client = IstioCertificateServiceClient::with_interceptor(svc, auth);
Ok(CaClient {
client,
Expand Down
1 change: 1 addition & 0 deletions src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod inpod;
pub mod tcp;
pub mod xds;

mod hyper_tower;
#[cfg(target_os = "linux")]
pub mod linux;
#[cfg(target_os = "linux")]
Expand Down
3 changes: 2 additions & 1 deletion src/test_helpers/ca.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tracing::error;
use crate::config::RootCert;

use crate::identity::{AuthSource, CaClient};
use crate::test_helpers::hyper_tower;
use crate::xds::istio::ca::istio_certificate_service_server::{
IstioCertificateService, IstioCertificateServiceServer,
};
Expand Down Expand Up @@ -67,7 +68,7 @@ impl CaServer {
if let Err(err) = crate::hyper_util::http2_server()
.serve_connection(
TokioIo::new(socket),
tower_hyper_http_body_compat::TowerService03HttpServiceAsHyper1HttpService::new(srv)
hyper_tower::TowerToHyperService::new(srv),
)
.await
{
Expand Down
75 changes: 75 additions & 0 deletions src/test_helpers/hyper_tower.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use pin_project_lite::pin_project;
use tonic::body::BoxBody;
use tonic::Status;
use tower::{BoxError, ServiceExt};

// Copied from https://github.com/hyperium/tonic/blob/34b863b1d2a204ef3dd871ec86860fc92aafb451/examples/src/tls_rustls/server.rs

/// An adaptor which converts a [`tower::Service`] to a [`hyper::service::Service`].
///
/// The [`hyper::service::Service`] trait is used by hyper to handle incoming requests,
/// and does not support the `poll_ready` method that is used by tower services.
///
/// This is provided here because the equivalent adaptor in hyper-util does not support
/// tonic::body::BoxBody bodies.
#[derive(Debug, Clone)]
pub struct TowerToHyperService<S> {
service: S,
}

impl<S> TowerToHyperService<S> {
/// Create a new `TowerToHyperService` from a tower service.
pub fn new(service: S) -> Self {
Self { service }
}
}

impl<S> hyper::service::Service<hyper::Request<hyper::body::Incoming>> for TowerToHyperService<S>
where
S: tower::Service<hyper::Request<BoxBody>> + Clone,
S::Error: Into<BoxError> + 'static,
{
type Response = S::Response;
type Error = BoxError;
type Future = TowerToHyperServiceFuture<S, hyper::Request<BoxBody>>;

fn call(&self, req: hyper::Request<hyper::body::Incoming>) -> Self::Future {
use http_body_util::BodyExt;
let req = req.map(|incoming| {
incoming
.map_err(|err| Status::from_error(err.into()))
.boxed_unsync()
});
TowerToHyperServiceFuture {
future: self.service.clone().oneshot(req),
}
}
}

pin_project! {
/// Future returned by [`TowerToHyperService`].
#[derive(Debug)]
pub struct TowerToHyperServiceFuture<S, R>
where
S: tower::Service<R>,
{
#[pin]
future: tower::util::Oneshot<S, R>,
}
}

impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
where
S: tower::Service<R>,
S::Error: Into<BoxError> + 'static,
{
type Output = Result<S::Response, BoxError>;

#[inline]
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.project().future.poll(cx).map_err(Into::into)
}
}
4 changes: 2 additions & 2 deletions src/test_helpers/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status, Streaming};
use tracing::{debug, error, info};

use super::test_config_with_port_xds_addr_and_root_cert;
use super::{hyper_tower, test_config_with_port_xds_addr_and_root_cert};
use crate::config::RootCert;
use crate::hyper_util::TokioExecutor;
use crate::metrics::sub_registry;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl AdsServer {
if let Err(err) = http2::Builder::new(TokioExecutor)
.serve_connection(
TokioIo::new(socket),
tower_hyper_http_body_compat::TowerService03HttpServiceAsHyper1HttpService::new(srv)
hyper_tower::TowerToHyperService::new(srv),
)
.await
{
Expand Down
28 changes: 9 additions & 19 deletions src/tls/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use crate::config::RootCert;
use crate::tls::lib::provider;
use crate::tls::{ClientCertProvider, Error, WorkloadCertificate};
use bytes::Bytes;
use http_body_1::{Body, Frame};
use hyper::body::Incoming;
use http_body::{Body, Frame};
use hyper::Uri;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
Expand All @@ -26,13 +25,11 @@ use std::future::Future;
use std::io::Cursor;
use std::pin::Pin;

use hyper::body::Incoming;
use std::task::{Context, Poll};
use std::time::Duration;

use tonic::body::BoxBody;
use tower_hyper_http_body_compat::{
http02_request_to_http1, http1_response_to_http02, HttpBody04ToHttpBody1, HttpBody1ToHttpBody04,
};

async fn root_to_store(root_cert: &RootCert) -> Result<rustls::RootCertStore, Error> {
let mut roots = rustls::RootCertStore::empty();
Expand Down Expand Up @@ -92,10 +89,11 @@ async fn control_plane_client_config(root_cert: &RootCert) -> Result<ClientConfi
.with_no_client_auth())
}

// pub type TlsGrpcChannel = hyper_util::client::legacy::Client<HttpsConnector<HttpConnector>, BoxBody>;
#[derive(Clone, Debug)]
pub struct TlsGrpcChannel {
uri: Uri,
client: hyper_util::client::legacy::Client<HttpsConnector<HttpConnector>, BoxBody1>,
client: hyper_util::client::legacy::Client<HttpsConnector<HttpConnector>, BoxBody>,
}

/// grpc_connector provides a client TLS channel for gRPC requests.
Expand Down Expand Up @@ -133,14 +131,10 @@ pub fn grpc_connector(uri: String, cc: ClientConfig) -> Result<TlsGrpcChannel, E
.timer(crate::hyper_util::TokioTimer)
.build(https);

// Ok(client)
Ok(TlsGrpcChannel { uri, client })
}

// Everything here is to hack hyper 1.0 onto tonic.
// TODO(https://github.com/hyperium/tonic/issues/1307) remove all of this and use tonic 'transport'

type BoxBody1 = HttpBody04ToHttpBody1<BoxBody>;

#[derive(Default)]
pub enum DefaultIncoming {
Some(Incoming),
Expand All @@ -165,17 +159,16 @@ impl Body for DefaultIncoming {
}
}

impl tower::Service<http_02::Request<BoxBody>> for TlsGrpcChannel {
type Response = http_02::Response<HttpBody1ToHttpBody04<DefaultIncoming>>;
impl tower::Service<http::Request<BoxBody>> for TlsGrpcChannel {
type Response = http::Response<DefaultIncoming>;
type Error = hyper_util::client::legacy::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, req: http_02::Request<BoxBody>) -> Self::Future {
let mut req = http02_request_to_http1(req.map(HttpBody04ToHttpBody1::new));
fn call(&mut self, mut req: http::Request<BoxBody>) -> Self::Future {
let mut uri = Uri::builder();
if let Some(scheme) = self.uri.scheme() {
uri = uri.scheme(scheme.to_owned());
Expand All @@ -191,10 +184,7 @@ impl tower::Service<http_02::Request<BoxBody>> for TlsGrpcChannel {
let future = self.client.request(req);
Box::pin(async move {
let res = future.await?;
Ok(http1_response_to_http02(
res.map(DefaultIncoming::Some)
.map(HttpBody1ToHttpBody04::new),
))
Ok(res.map(DefaultIncoming::Some))
})
}
}

0 comments on commit 6d01599

Please sign in to comment.