"""D-Bus interfaces that defines and powers our commands."""
import logging
from dataclasses import dataclass
from typing import Any
from dasbus.server.interface import dbus_interface
from dasbus.server.template import InterfaceTemplate
from dasbus.typing import Str, Structure
from command_line_assistant.constants import VERSION
from command_line_assistant.daemon.database.manager import DatabaseManager
from command_line_assistant.daemon.database.repository.chat import ChatRepository
from command_line_assistant.daemon.http.query import submit
from command_line_assistant.daemon.session import UserSessionManager
from command_line_assistant.dbus.constants import CHAT_IDENTIFIER
from command_line_assistant.dbus.context import DaemonContext
from command_line_assistant.dbus.exceptions import ChatNotFoundError
from command_line_assistant.dbus.interfaces.authorization import DBusAuthorizationMixin
from command_line_assistant.dbus.sender_context import get_current_sender
from command_line_assistant.dbus.structures.chat import (
ChatEntry,
ChatList,
Question,
Response,
)
logger = logging.getLogger(__name__)
[docs]
@dataclass
class InferencePayload:
"""The payload to be submitted to the backend for inference."""
content: Question
[docs]
def to_dict(self) -> dict[str, Any]:
"""Turn content into dictionary for submission to the backend.
Returns:
dict[str, Any]: The content in dictionary format.
"""
return {
"question": self.content.message,
"context": {
"stdin": self.content.stdin.stdin,
"attachments": {
"contents": self.content.attachment.contents,
"mimetype": self.content.attachment.mimetype,
},
"terminal": {"output": self.content.terminal.output},
"systeminfo": {
"os": self.content.systeminfo.os,
"version": self.content.systeminfo.version,
"arch": self.content.systeminfo.arch,
"id": self.content.systeminfo.id,
},
"cla": {"version": VERSION},
},
}
[docs]
@dbus_interface(CHAT_IDENTIFIER.interface_name)
class ChatInterface(InterfaceTemplate, DBusAuthorizationMixin):
"""The DBus interface of a query."""
def __init__(self, implementation: DaemonContext):
"""Constructor of the class
Arguments:
implementation (DaemonContext): The implementation context to be used in an interface.
"""
super().__init__(implementation)
self._db_manager = DatabaseManager(implementation.config)
self._chat_repository = ChatRepository(self._db_manager)
self._session_manager = UserSessionManager()
[docs]
def _verify_caller_authorization(self, sender: str, requested_user_id: str) -> None:
"""Verify that the caller is authorized to access the requested user's data.
Arguments:
sender: The D-Bus sender.
requested_user_id (str): The user ID being requested in the method call.
Raises:
PermissionError: If the caller's user ID doesn't match the requested user ID.
"""
self._verify_internal_user_authorization(
sender, requested_user_id, self._session_manager
)
[docs]
def GetAllChatFromUser(self, user_id: Str) -> Structure:
"""Get all the chat session for a given user.
Arguments:
user_id (Str): The identifier of the user.
Returns:
Structure: The list of chat sessions.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
result = self._chat_repository.select_all_by_user_id(user_id)
chat_entries = []
for entry in result:
chat_entries.append(
ChatEntry(
str(entry.id),
entry.name,
entry.description,
str(entry.created_at),
str(entry.updated_at),
str(entry.deleted_at),
)
)
return ChatList(chat_entries).structure()
[docs]
def DeleteAllChatForUser(self, user_id: Str) -> None:
"""Delete all chats from the user.
Arguments:
user_id (Str): The identifier of the user.
Raises:
ChatNotFoundError: In case no chat was found for the current user.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
all_chats = self._chat_repository.select_all_by_user_id(user_id)
if not all_chats:
raise ChatNotFoundError("No chat found to delete.")
for chat in all_chats:
logger.info(
"Deleting chat for user.",
extra={"audit": True, "chat_id": chat.id, "user_id": user_id},
)
self._chat_repository.delete(chat.id)
[docs]
def DeleteChatForUser(self, user_id: Str, name: Str) -> None:
"""Delete a specific chat for a user.
Arguments:
user_id (Str): The identifier of the user.
name (Str): The name of the chat.
Raises:
ChatNotFoundError: In case no chat was found with the given name for the current user.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
logger.info(
"Looking for chat associated with the user.",
extra={"audit": True, "chat_name": name, "user_id": user_id},
)
chat = self._chat_repository.select_by_name(user_id, name)
if not chat:
logger.info(
"Couldn't find chat with name '%s' for user '%s'. Either the name is not correct or the chat does not exist.",
name,
user_id,
)
raise ChatNotFoundError(
f"Couldn't find chat with name '{name}'. Check the name requested and try again."
)
logger.info(
"Deleting the request chat for user.",
extra={"audit": True, "chat_id": chat[0].id, "user_id": user_id},
)
self._chat_repository.delete(chat[0].id)
[docs]
def GetLatestChatFromUser(self, user_id: Str) -> Str:
"""Get the latest chat session for a given user.
Arguments:
user_id (Str): The identifier of the user.
Returns:
Str: The identifier of the chat session.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
result = self._chat_repository.select_latest_chat(user_id)
return str(result.id)
[docs]
def IsChatAvailable(self, user_id: Str, name: Str) -> bool:
"""Check if a chat session is available for a given user.
Arguments:
user_id (Str): The identifier of the user.
name (Str): The name of the chat.
Returns:
bool: True if the chat session is available, False otherwise.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
result = self._chat_repository.select_by_name(user_id, name)
if not result:
logger.info(
"Chat session with name '%s' not found for user '%s'.",
name,
user_id,
)
return False
logger.info(
"Chat session with name '%s' found for user '%s'.",
name,
user_id,
)
return True
[docs]
def GetChatId(self, user_id: Str, name: Str) -> Str:
"""Get the chat id for a given user and chat name.
Arguments:
user_id (Str): The identifier of the user.
name (Str): The name of the chat.
Raises:
ChatNotFound: If the chat is not found.
Returns:
Str: The identifier of the chat session.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
result = self._chat_repository.select_by_name(user_id, name)
if not result:
raise ChatNotFoundError(
f"No chat found with name '{name}'. Please, make sure that this chat exist first."
)
logger.info(
"Found existing chat with id '%s' and name '%s' for user '%s'",
result[0].id,
name,
user_id,
)
return str(result[0].id)
[docs]
def CreateChat(self, user_id: Str, name: Str, description: Str) -> Str:
"""Create a new chat session for a given conversation.
Arguments:
user_id (Str): The identifier of the user.
name (Str): The name of the chat. If not specified, a random name will be given.
description (Str): A description for the chat to identify the context of it.
Returns:
Str: The identifier of the chat session.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
identifier = self._chat_repository.insert(
{"user_id": user_id, "name": name, "description": description}
)
logger.info(
"New chat session created for user.",
extra={"audit": True, "identifier": identifier, "chat_name": name},
)
return str(identifier[0])
[docs]
def AskQuestion(self, user_id: Str, message_input: Structure) -> Structure:
"""This method is mainly called by the client to retrieve it's answer.
Arguments:
user_id (Str): The identifier of the user.
message_input (Structure): The message input in format of a d-bus structure.
Returns:
Structure: The message output in format of a d-bus structure.
"""
# Verify caller authorization
sender = get_current_sender()
self._verify_caller_authorization(sender, user_id)
# Submit query to backend
content = Question.from_structure(message_input)
payload = InferencePayload(content)
logger.info(
"Submitting question from user.",
extra={
"audit": True,
"user": user_id,
},
)
llm_response = submit(payload.to_dict(), self.implementation.config)
# Create message object
response = Response(llm_response)
# Return the data
return response.structure()