Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
364 changes: 225 additions & 139 deletions src/agent-client-protocol-core/src/jsonrpc.rs

Large diffs are not rendered by default.

237 changes: 86 additions & 151 deletions src/agent-client-protocol-core/src/jsonrpc/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::jsonrpc::{HandleDispatchFrom, Handled, IntoHandled, JsonRpcResponse};
use crate::jsonrpc::{
HandleDispatchFrom, Handled, IntoHandled, NotificationMatch, RequestMatch, TypedDispatchMatch,
};

use crate::role::{HasPeer, Role, handle_incoming_dispatch};
use crate::{ConnectionTo, Dispatch, JsonRpcNotification, JsonRpcRequest, UntypedMessage};
Expand Down Expand Up @@ -108,67 +110,44 @@ where
dispatch,
connection,
async |dispatch, connection| {
match dispatch {
Dispatch::Request(message, responder) => {
tracing::debug!(
request_type = std::any::type_name::<Req>(),
message = ?message,
"RequestHandler::handle_request"
);
if Req::matches_method(&message.method) {
match Req::parse_message(&message.method, &message.params) {
Ok(req) => {
tracing::trace!(
?req,
"RequestHandler::handle_request: parse completed"
);
let typed_responder = responder.cast();
let result = (self.to_future_hack)(
&mut self.handler,
req,
typed_responder,
connection,
)
.await?;
match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (request, responder),
retry,
} => {
// Handler returned the request back, convert to untyped
let untyped = request.to_untyped_message()?;
Ok(Handled::No {
message: Dispatch::Request(
untyped,
responder.erase_to_json(),
),
retry,
})
}
}
}
Err(err) => {
tracing::trace!(
?err,
"RequestHandler::handle_request: parse errored"
);
Err(err)
}
}
} else {
tracing::trace!("RequestHandler::handle_request: method doesn't match");
Ok(Handled::No {
message: Dispatch::Request(message, responder),
retry: false,
})
if let Dispatch::Request(message, _) = &dispatch {
tracing::debug!(
request_type = std::any::type_name::<Req>(),
message = ?message,
"RequestHandler::handle_request"
);
}
match dispatch.match_request::<Req>() {
RequestMatch::Matched(req, typed_responder) => {
tracing::trace!(?req, "RequestHandler::handle_request: parse completed");
let result = (self.to_future_hack)(
&mut self.handler,
req,
typed_responder,
connection,
)
.await?;
match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (request, responder),
retry,
} => Dispatch::<Req, UntypedMessage>::Request(request, responder)
.erase_into_unhandled(retry),
}
}

Dispatch::Notification(..) | Dispatch::Response(..) => Ok(Handled::No {
message: dispatch,
retry: false,
}),
RequestMatch::Unhandled(dispatch) => {
tracing::trace!("RequestHandler::handle_request: method doesn't match");
Ok(Handled::No {
message: dispatch,
retry: false,
})
}
RequestMatch::Rejected { dispatch, error } => {
tracing::trace!(?error, "RequestHandler::handle_request: parse errored");
dispatch.respond_with_error(error, connection)?;
Ok(Handled::Yes)
}
}
},
)
Expand Down Expand Up @@ -236,61 +215,47 @@ where
dispatch,
connection,
async |dispatch, connection| {
match dispatch {
Dispatch::Notification(message) => {
tracing::debug!(
request_type = std::any::type_name::<Notif>(),
message = ?message,
"NotificationHandler::handle_dispatch"
if let Dispatch::Notification(message) = &dispatch {
tracing::debug!(
request_type = std::any::type_name::<Notif>(),
message = ?message,
"NotificationHandler::handle_dispatch"
);
}
match dispatch.match_notification::<Notif>() {
NotificationMatch::Matched(notif) => {
tracing::trace!(
?notif,
"NotificationHandler::handle_notification: parse completed"
);
if Notif::matches_method(&message.method) {
match Notif::parse_message(&message.method, &message.params) {
Ok(notif) => {
tracing::trace!(
?notif,
"NotificationHandler::handle_notification: parse completed"
);
let result =
(self.to_future_hack)(&mut self.handler, notif, connection)
.await?;
match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (notification, _cx),
retry,
} => {
// Handler returned the notification back, convert to untyped
let untyped = notification.to_untyped_message()?;
Ok(Handled::No {
message: Dispatch::Notification(untyped),
retry,
})
}
}
}
Err(err) => {
tracing::trace!(
?err,
"NotificationHandler::handle_notification: parse errored"
);
Err(err)
}
}
} else {
tracing::trace!(
"NotificationHandler::handle_notification: method doesn't match"
);
Ok(Handled::No {
message: Dispatch::Notification(message),
retry: false,
})
let result =
(self.to_future_hack)(&mut self.handler, notif, connection).await?;
match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: (notification, _cx),
retry,
} => Dispatch::<UntypedMessage, Notif>::Notification(notification)
.erase_into_unhandled(retry),
}
}

Dispatch::Request(..) | Dispatch::Response(..) => Ok(Handled::No {
message: dispatch,
retry: false,
}),
NotificationMatch::Unhandled(dispatch) => {
tracing::trace!(
"NotificationHandler::handle_notification: method doesn't match"
);
Ok(Handled::No {
message: dispatch,
retry: false,
})
}
NotificationMatch::Rejected { dispatch, error } => {
tracing::trace!(
?error,
"NotificationHandler::handle_notification: parse errored"
);
dispatch.respond_with_error(error, connection)?;
Ok(Handled::Yes)
}
}
},
)
Expand Down Expand Up @@ -362,57 +327,27 @@ where
self.peer.clone(),
dispatch,
connection,
async |dispatch, connection| match dispatch.into_typed_dispatch::<Req, Notif>()? {
Ok(typed_dispatch) => {
async |dispatch, connection| match dispatch.match_typed_dispatch::<Req, Notif>() {
TypedDispatchMatch::Matched(typed_dispatch) => {
let result =
(self.to_future_hack)(&mut self.handler, typed_dispatch, connection)
.await?;
match result.into_handled() {
Handled::Yes => Ok(Handled::Yes),
Handled::No {
message: Dispatch::Request(request, responder),
message: typed_dispatch,
retry,
} => {
let untyped = request.to_untyped_message()?;
Ok(Handled::No {
message: Dispatch::Request(untyped, responder.erase_to_json()),
retry,
})
}
Handled::No {
message: Dispatch::Notification(notification),
retry,
} => {
let untyped = notification.to_untyped_message()?;
Ok(Handled::No {
message: Dispatch::Notification(untyped),
retry,
})
}
Handled::No {
message: Dispatch::Response(result, responder),
retry,
} => {
let method = responder.method();
let untyped_result = match result {
Ok(response) => response.into_json(method).map(Ok),
Err(err) => Ok(Err(err)),
}?;
Ok(Handled::No {
message: Dispatch::Response(
untyped_result,
responder.erase_to_json(),
),
retry,
})
}
} => typed_dispatch.erase_into_unhandled(retry),
}
}

Err(dispatch) => Ok(Handled::No {
TypedDispatchMatch::Unhandled(dispatch) => Ok(Handled::No {
message: dispatch,
retry: false,
}),
TypedDispatchMatch::Rejected { dispatch, error } => {
dispatch.respond_with_error(error, connection)?;
Ok(Handled::Yes)
}
},
)
.await
Expand Down
3 changes: 2 additions & 1 deletion src/agent-client-protocol-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ pub mod jsonrpcmsg {
pub use jsonrpc::{
Builder, ByteStreams, Channel, ConnectionTo, Dispatch, HandleDispatchFrom, Handled,
IntoHandled, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Lines,
NullHandler, Responder, ResponseRouter, SentRequest, UntypedMessage,
NotificationMatch, NullHandler, RequestMatch, Responder, ResponseRouter, SentRequest,
TypedDispatchMatch, UntypedMessage,
run::{ChainRun, NullRun, RunWithConnectionTo},
};

Expand Down
10 changes: 8 additions & 2 deletions src/agent-client-protocol-core/src/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{any::TypeId, fmt::Debug, future::Future, hash::Hash};
use serde::{Deserialize, Serialize};

use crate::schema::{METHOD_SUCCESSOR_MESSAGE, SuccessorMessage};
use crate::util::json_cast;
use crate::{Builder, ConnectionTo, Dispatch, Handled, JsonRpcMessage, UntypedMessage};

/// Roles for the ACP protocol.
Expand Down Expand Up @@ -230,7 +229,14 @@ where
"Response variant cannot be unwrapped as SuccessorMessage",
)
})?;
let SuccessorMessage { message, meta } = json_cast(untyped_message.params())?;
let SuccessorMessage { message, meta } =
match crate::util::json_cast_params(untyped_message.params()) {
Ok(message) => message,
Err(error) => {
dispatch.respond_with_error(error, connection.clone())?;
return Ok(Handled::Yes);
}
};
let successor_dispatch = dispatch.try_map_message(|_| Ok(message))?;
tracing::trace!(
unwrapped_method = %successor_dispatch.method(),
Expand Down
9 changes: 8 additions & 1 deletion src/agent-client-protocol-core/src/role/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,14 @@ where
MatchDispatchFrom::new(message, &connection)
.if_message_from(Agent, async |message| {
// If this is for our session-id, proxy it to the client.
if let Some(session_id) = message.get_session_id()?
let session_id = match message.get_session_id() {
Ok(session_id) => session_id,
Err(error) => {
message.respond_with_error(error, connection.clone())?;
return Ok(Handled::Yes);
}
};
if let Some(session_id) = session_id
&& session_id == self.session_id
{
connection.send_proxied_message_to(Client, message)?;
Expand Down
12 changes: 6 additions & 6 deletions src/agent-client-protocol-core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ macro_rules! impl_jsonrpc_request {
if method != $method {
return Err($crate::Error::method_not_found());
}
$crate::util::json_cast(params)
$crate::util::json_cast_params(params)
}
}

Expand Down Expand Up @@ -84,7 +84,7 @@ macro_rules! impl_jsonrpc_notification {
if method != $method {
return Err($crate::Error::method_not_found());
}
$crate::util::json_cast(params)
$crate::util::json_cast_params(params)
}
}

Expand Down Expand Up @@ -133,10 +133,10 @@ macro_rules! impl_jsonrpc_request_enum {
params: &impl serde::Serialize,
) -> Result<Self, $crate::Error> {
match method {
$( $(#[$meta])* $method => $crate::util::json_cast(params).map(Self::$variant), )*
$( $(#[$meta])* $method => $crate::util::json_cast_params(params).map(Self::$variant), )*
_ => {
if let Some(custom_method) = method.strip_prefix('_') {
$crate::util::json_cast(params).map(
$crate::util::json_cast_params(params).map(
|ext_req: $crate::schema::ExtRequest| {
Self::$ext_variant($crate::schema::ExtRequest::new(
custom_method.to_string(),
Expand Down Expand Up @@ -196,10 +196,10 @@ macro_rules! impl_jsonrpc_notification_enum {
params: &impl serde::Serialize,
) -> Result<Self, $crate::Error> {
match method {
$( $(#[$meta])* $method => $crate::util::json_cast(params).map(Self::$variant), )*
$( $(#[$meta])* $method => $crate::util::json_cast_params(params).map(Self::$variant), )*
_ => {
if let Some(custom_method) = method.strip_prefix('_') {
$crate::util::json_cast(params).map(
$crate::util::json_cast_params(params).map(
|ext_notif: $crate::schema::ExtNotification| {
Self::$ext_variant($crate::schema::ExtNotification::new(
custom_method.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions src/agent-client-protocol-core/src/schema/proxy_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<M: JsonRpcMessage> JsonRpcMessage for SuccessorMessage<M> {
if method != METHOD_SUCCESSOR_MESSAGE {
return Err(crate::Error::method_not_found());
}
let outer = crate::util::json_cast::<_, SuccessorMessage<UntypedMessage>>(params)?;
let outer = crate::util::json_cast_params::<_, SuccessorMessage<UntypedMessage>>(params)?;
if !M::matches_method(&outer.message.method) {
return Err(crate::Error::method_not_found());
}
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<M: JsonRpcMessage> JsonRpcMessage for McpOverAcpMessage<M> {
if method != METHOD_MCP_MESSAGE {
return Err(crate::Error::method_not_found());
}
let outer = crate::util::json_cast::<_, McpOverAcpMessage<UntypedMessage>>(params)?;
let outer = crate::util::json_cast_params::<_, McpOverAcpMessage<UntypedMessage>>(params)?;
if !M::matches_method(&outer.message.method) {
return Err(crate::Error::method_not_found());
}
Expand Down
Loading
Loading