Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WPB-4888: Implement request tracing across federation #3765

Merged
merged 21 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5-internal/WPB-4888
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Request tracing across federated requests
7 changes: 6 additions & 1 deletion libs/types-common/src/Data/Id.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import Data.UUID qualified as UUID
import Data.UUID.V4
import Imports
import Servant (FromHttpApiData (..), ToHttpApiData (..))
import System.Logger (ToBytes)
import Test.QuickCheck
import Test.QuickCheck.Instances ()

Expand Down Expand Up @@ -413,7 +414,8 @@ newtype RequestId = RequestId
ToByteString,
Hashable,
NFData,
Generic
Generic,
ToBytes
)

instance ToSchema RequestId where
Expand All @@ -437,6 +439,9 @@ instance EncodeWire RequestId where
instance DecodeWire RequestId where
decodeWire = fmap RequestId . decodeWire

instance FromHttpApiData RequestId where
parseUrlPiece = Right . RequestId . encodeUtf8

-- Rendering Id values in JSON objects -----------------------------------------

newtype IdObject a = IdObject {fromIdObject :: a}
Expand Down
2 changes: 1 addition & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fedQueueClient ::
FedQueueClient (NotificationComponent k) ()
fedQueueClient payload = do
env <- ask
let notif = fedNotifToBackendNotif @tag env.originDomain payload
let notif = fedNotifToBackendNotif @tag env.requestId env.originDomain payload
msg =
newMsg
{ msgBody = encode notif,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Control.Exception
import Control.Monad.Except
import Data.Aeson
import Data.Domain
import Data.Id (RequestId)
import Data.Map qualified as Map
import Data.Text qualified as Text
import Data.Text.Lazy.Encoding qualified as TL
Expand All @@ -31,7 +32,8 @@ data BackendNotification = BackendNotification
-- this body, which could be very large and completely useless to the
-- pusher. This also makes development less clunky as we don't have to
-- create a sum type here for all types of notifications that could exist.
body :: RawJson
body :: RawJson,
requestId :: Maybe RequestId
battermann marked this conversation as resolved.
Show resolved Hide resolved
}
deriving (Show, Eq)

Expand All @@ -41,7 +43,8 @@ instance ToJSON BackendNotification where
[ "ownDomain" .= notif.ownDomain,
"targetComponent" .= notif.targetComponent,
"path" .= notif.path,
"body" .= TL.decodeUtf8 notif.body.rawJsonBytes
"body" .= TL.decodeUtf8 notif.body.rawJsonBytes,
"requestId" .= notif.requestId
]

instance FromJSON BackendNotification where
Expand All @@ -51,6 +54,7 @@ instance FromJSON BackendNotification where
<*> o .: "targetComponent"
<*> o .: "path"
<*> (RawJson . TL.encodeUtf8 <$> o .: "body")
<*> o .:? "requestId"

type BackendNotificationAPI = Capture "name" Text :> ReqBody '[JSON] RawJson :> Post '[JSON] EmptyResponse

Expand All @@ -70,8 +74,8 @@ sendNotification env component path body =
runFederatorClient env . void $
clientIn (Proxy @BackendNotificationAPI) (Proxy @(FederatorClient c)) (withoutFirstSlash path) body

enqueue :: Q.Channel -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a
enqueue channel originDomain targetDomain deliveryMode (FedQueueClient action) =
enqueue :: Q.Channel -> Maybe RequestId -> Domain -> Domain -> Q.DeliveryMode -> FedQueueClient c a -> IO a
enqueue channel requestId originDomain targetDomain deliveryMode (FedQueueClient action) =
runReaderT action FedQueueEnv {..}

routingKey :: Text -> Text
Expand Down Expand Up @@ -127,7 +131,8 @@ data FedQueueEnv = FedQueueEnv
{ channel :: Q.Channel,
originDomain :: Domain,
targetDomain :: Domain,
deliveryMode :: Q.DeliveryMode
deliveryMode :: Q.DeliveryMode,
requestId :: Maybe RequestId
}

data EnqueueError = EnqueueError String
Expand Down
5 changes: 4 additions & 1 deletion libs/wire-api-federation/src/Wire/API/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import Data.ByteString.Builder
import Data.ByteString.Conversion (toByteString')
import Data.ByteString.Lazy qualified as LBS
import Data.Domain
import Data.Id
import Data.Sequence qualified as Seq
import Data.Set qualified as Set
import Data.Text.Encoding qualified as Text
Expand Down Expand Up @@ -74,7 +75,8 @@ data FederatorClientEnv = FederatorClientEnv
{ ceOriginDomain :: Domain,
ceTargetDomain :: Domain,
ceFederator :: Endpoint,
ceHttp2Manager :: Http2Manager
ceHttp2Manager :: Http2Manager,
ceOriginRequestId :: Maybe RequestId
}

data FederatorClientVersionedEnv = FederatorClientVersionedEnv
Expand Down Expand Up @@ -215,6 +217,7 @@ withHTTP2StreamingRequest successfulStatus req handleResponse = do
toList (requestHeaders req)
<> [(originDomainHeaderName, toByteString' (ceOriginDomain env))]
<> [(HTTP.hAccept, HTTP.renderHeader (toList $ req.requestAccept))]
<> [("Wire-Origin-Request-Id", rid) | rid <- toByteString' <$> maybeToList (ceOriginRequestId env)]
req' =
HTTP2.requestBuilder
(requestMethod req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Wire.API.Federation.HasNotificationEndpoint where

import Data.Aeson
import Data.Domain
import Data.Id
import Data.Kind
import Data.Proxy
import Data.Text qualified as T
Expand Down Expand Up @@ -46,10 +47,11 @@ fedNotifToBackendNotif ::
KnownSymbol (NotificationPath tag) =>
KnownComponent (NotificationComponent k) =>
ToJSON (Payload tag) =>
Maybe RequestId ->
Domain ->
Payload tag ->
BackendNotification
fedNotifToBackendNotif ownDomain payload =
fedNotifToBackendNotif mReqId ownDomain payload =
battermann marked this conversation as resolved.
Show resolved Hide resolved
let p = T.pack . symbolVal $ Proxy @(NotificationPath tag)
b = RawJson . encode $ payload
in toNotif p b
Expand All @@ -60,5 +62,6 @@ fedNotifToBackendNotif ownDomain payload =
{ ownDomain = ownDomain,
targetComponent = componentVal @(NotificationComponent k),
path = path,
body = body
body = body,
requestId = mReqId
battermann marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pushNotification runningFlag targetDomain (msg, envelope) = do
ceHttp2Manager <- asks http2Manager
let ceOriginDomain = notif.ownDomain
ceTargetDomain = targetDomain
ceOriginRequestId = notif.requestId
fcEnv = FederatorClientEnv {..}
liftIO $ either throwM pure =<< sendNotification fcEnv notif.targetComponent notif.path notif.body
lift $ ack envelope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ spec = do
{ targetComponent = Brig,
ownDomain = origDomain,
path = "/on-user-deleted-connections",
body = RawJson $ Aeson.encode notifContent
body = RawJson $ Aeson.encode notifContent,
requestId = Nothing
}
envelope <- newMockEnvelope
let msg =
Expand Down Expand Up @@ -128,7 +129,8 @@ spec = do
{ targetComponent = Brig,
ownDomain = origDomain,
path = "/on-user-deleted-connections",
body = RawJson $ Aeson.encode notifContent
body = RawJson $ Aeson.encode notifContent,
requestId = Nothing
}
envelope <- newMockEnvelope
let msg =
Expand Down
1 change: 0 additions & 1 deletion services/brig/brig.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ executable brig-integration
, unliftio
, unordered-containers
, uri-bytestring >=0.2
, uuid
, vector >=0.10
, wai
, wai-extra
Expand Down
1 change: 0 additions & 1 deletion services/brig/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ mkDerivation {
unliftio
unordered-containers
uri-bytestring
uuid
vector
wai
wai-extra
Expand Down
13 changes: 9 additions & 4 deletions services/brig/src/Brig/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-- FUTUREWORK: Remove this module all together.
module Brig.Federation.Client where

import Brig.App
import Brig.App as Brig
import Control.Lens
import Control.Monad
import Control.Monad.Catch (MonadMask, throwM)
Expand Down Expand Up @@ -161,22 +161,25 @@ notifyUserDeleted self remotes = do
. Log.field "error" (show FederationNotConfigured)

-- | Enqueues notifications in RabbitMQ. Retries 3 times with a delay of 1s.
enqueueNotification :: (MonadIO m, MonadMask m, Log.MonadLogger m) => Domain -> Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c () -> m ()
enqueueNotification :: (MonadIO m, MonadMask m, Log.MonadLogger m, MonadReader Env m) => Domain -> Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c () -> m ()
enqueueNotification ownDomain remoteDomain deliveryMode chanVar action = do
let policy = limitRetries 3 <> constantDelay 1_000_000
recovering policy [logRetries (const $ pure True) logError] (const go)
where
logError willRetry (SomeException e) status = do
rid <- view Brig.requestId
Log.err $
Log.msg @Text "failed to enqueue notification in RabbitMQ"
. Log.field "error" (displayException e)
. Log.field "willRetry" willRetry
. Log.field "retryCount" status.rsIterNumber
. Log.field "request" rid
go = do
rid <- view Brig.requestId
mChan <- timeout (1 :: Second) (readMVar chanVar)
case mChan of
Nothing -> throwM NoRabbitMqChannel
Just chan -> liftIO $ enqueue chan ownDomain remoteDomain deliveryMode action
Just chan -> liftIO $ enqueue chan (Just rid) ownDomain remoteDomain deliveryMode action

data NoRabbitMqChannel = NoRabbitMqChannel
deriving (Show)
Expand All @@ -192,12 +195,14 @@ runBrigFederatorClient targetDomain action = do
ownDomain <- viewFederationDomain
endpoint <- view federator >>= maybe (throwE FederationNotConfigured) pure
mgr <- view http2Manager
rid <- view Brig.requestId
let env =
FederatorClientEnv
{ ceOriginDomain = ownDomain,
ceTargetDomain = targetDomain,
ceFederator = endpoint,
ceHttp2Manager = mgr
ceHttp2Manager = mgr,
ceOriginRequestId = Just rid
}
liftIO (runFederatorClient env action)
>>= either (throwE . FederationCallFailure) pure
4 changes: 3 additions & 1 deletion services/cargohold/src/CargoHold/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ mkFederatorClientEnv remote = do
view (options . federator)
>>= maybe (throwE federationNotConfigured) pure
mgr <- view http2Manager
rid <- view requestId
pure
FederatorClientEnv
{ ceOriginDomain = tDomain loc,
ceTargetDomain = tDomain remote,
ceFederator = endpoint,
ceHttp2Manager = mgr
ceHttp2Manager = mgr,
ceOriginRequestId = Just rid
}

executeFederated :: Remote x -> FederatorClient 'Cargohold a -> Handler a
Expand Down
14 changes: 10 additions & 4 deletions services/federator/src/Federator/ExternalServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import Data.Bifunctor
import Data.ByteString qualified as BS
import Data.ByteString.Builder
import Data.ByteString.Lazy qualified as LBS
import Data.Default (Default (def))
import Data.Domain
import Data.Id (RequestId)
import Data.Metrics.Servant qualified as Metrics
import Data.Proxy (Proxy (Proxy))
import Data.Sequence qualified as Seq
Expand Down Expand Up @@ -86,6 +88,7 @@ data API mode = API
:- "federation"
:> Capture "component" Component
:> Capture "rpc" RPC
:> Header "Wire-Origin-Request-Id" RequestId
:> Header' '[Required, Strict] OriginDomainHeaderName Domain
:> Header' '[Required, Strict] "X-SSL-Certificate" CertHeader
:> Endpath
Expand Down Expand Up @@ -114,8 +117,8 @@ server ::
server mgr intPort interpreter =
API
{ status = Health.status mgr "internal server" intPort,
externalRequest = \component rpc remoteDomain remoteCert ->
Tagged $ \req respond -> runCodensity (interpreter (callInward component rpc remoteDomain remoteCert req)) respond
externalRequest = \component rpc mReqId remoteDomain remoteCert ->
Tagged $ \req respond -> runCodensity (interpreter (callInward component rpc mReqId remoteDomain remoteCert req)) respond
}

-- FUTUREWORK(federation): Versioning of the federation API.
Expand All @@ -132,11 +135,12 @@ callInward ::
) =>
Component ->
RPC ->
Maybe RequestId ->
Domain ->
CertHeader ->
Wai.Request ->
Sem r Wai.Response
callInward component (RPC rpc) originDomain (CertHeader cert) wreq = do
callInward component (RPC rpc) mReqId originDomain (CertHeader cert) wreq = do
incomingCounterIncr originDomain
-- only POST is supported
when (Wai.requestMethod wreq /= HTTP.methodPost) $
Expand All @@ -151,16 +155,18 @@ callInward component (RPC rpc) originDomain (CertHeader cert) wreq = do
. Log.field "originDomain" (domainText originDomain)
. Log.field "component" (show component)
. Log.field "rpc" rpc
. Log.field "request" (fromMaybe def mReqId)

validatedDomain <- validateDomain cert originDomain

let path = LBS.toStrict (toLazyByteString (HTTP.encodePathSegments ["federation", rpc]))

body <- embed $ Wai.lazyRequestBody wreq
resp <- serviceCall component path body validatedDomain
resp <- serviceCall component path body mReqId validatedDomain
Log.debug $
Log.msg ("Inward Request response" :: ByteString)
. Log.field "status" (show (responseStatusCode resp))
. Log.field "request" (fromMaybe def mReqId)
pure $
streamingResponseToWai
resp
Expand Down
10 changes: 7 additions & 3 deletions services/federator/src/Federator/InternalServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Control.Monad.Codensity
import Data.Binary.Builder
import Data.ByteString qualified as BS
import Data.Domain
import Data.Id
import Data.Metrics.Servant qualified as Metrics
import Data.Proxy
import Federator.Env
Expand Down Expand Up @@ -62,6 +63,7 @@ data API mode = API
internalRequest ::
mode
:- "rpc"
:> Header "Wire-Origin-Request-Id" RequestId
:> Capture "domain" Domain
:> Capture "component" Component
:> Capture "rpc" RPC
Expand Down Expand Up @@ -89,8 +91,8 @@ server ::
server mgr extPort interpreter =
API
{ status = Health.status mgr "external server" extPort,
internalRequest = \remoteDomain component rpc ->
Tagged $ \req respond -> runCodensity (interpreter (callOutward remoteDomain component rpc req)) respond
internalRequest = \mReqId remoteDomain component rpc ->
Tagged $ \req respond -> runCodensity (interpreter (callOutward mReqId remoteDomain component rpc req)) respond
}

callOutward ::
Expand All @@ -102,12 +104,13 @@ callOutward ::
Member Metrics r,
Member (Logger (Log.Msg -> Log.Msg)) r
) =>
Maybe RequestId ->
Domain ->
Component ->
RPC ->
Wai.Request ->
Sem r Wai.Response
callOutward targetDomain component (RPC path) req = do
callOutward mReqId targetDomain component (RPC path) req = do
-- only POST is supported
when (Wai.requestMethod req /= HTTP.methodPost) $
throw InvalidRoute
Expand All @@ -125,6 +128,7 @@ callOutward targetDomain component (RPC path) req = do
. Log.field "body" body
resp <-
discoverAndCall
mReqId
targetDomain
component
path
Expand Down
2 changes: 1 addition & 1 deletion services/federator/src/Federator/MockServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mockServer ::
mockServer remoteCalls headers resp interpreter =
Federator.InternalServer.API
{ status = const $ pure NoContent,
internalRequest = \targetDomain component rpc ->
internalRequest = \_mReqId targetDomain component rpc ->
battermann marked this conversation as resolved.
Show resolved Hide resolved
Tagged $ \req respond ->
respond =<< interpreter (mockInternalRequest remoteCalls headers resp targetDomain component rpc req)
}
Expand Down
Loading
Loading