diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py index 879348feaa..82e9dab124 100644 --- a/python/qemu/aqmp/qmp_client.py +++ b/python/qemu/aqmp/qmp_client.py @@ -484,6 +484,57 @@ async def _execute(self, msg: Message, assign_id: bool = True) -> Message: exec_id = await self._issue(msg) return await self._reply(exec_id) + @upper_half + @require(Runstate.RUNNING) + async def _raw( + self, + msg: Union[Message, Mapping[str, object], bytes], + assign_id: bool = True, + ) -> Message: + """ + Issue a raw `Message` to the QMP server and await a reply. + + :param msg: + A Message to send to the server. It may be a `Message`, any + Mapping (including Dict), or raw bytes. + :param assign_id: + Assign an arbitrary execution ID to this message. If + `False`, the existing id must either be absent (and no other + such pending execution may omit an ID) or a string. If it is + a string, it must not start with '__aqmp#' and no other such + pending execution may currently be using that ID. + + :return: Execution reply from the server. + + :raise ExecInterruptedError: + When the reply could not be retrieved because the connection + was lost, or some other problem. + :raise TypeError: + When assign_id is `False`, an ID is given, and it is not a string. + :raise ValueError: + When assign_id is `False`, but the ID is not usable; + Either because it starts with '__aqmp#' or it is already in-use. + """ + # 1. convert generic Mapping or bytes to a QMP Message + # 2. copy Message objects so that we assign an ID only to the copy. + msg = Message(msg) + + exec_id = msg.get('id') + if not assign_id and 'id' in msg: + if not isinstance(exec_id, str): + raise TypeError(f"ID ('{exec_id}') must be a string.") + if exec_id.startswith('__aqmp#'): + raise ValueError( + f"ID ('{exec_id}') must not start with '__aqmp#'." + ) + + if not assign_id and exec_id in self._pending: + raise ValueError( + f"ID '{exec_id}' is in-use and cannot be used." + ) + + return await self._execute(msg, assign_id=assign_id) + @upper_half @require(Runstate.RUNNING) async def execute_msg(self, msg: Message) -> object: