krha/krha.py

75 lines
2.3 KiB
Python
Raw Normal View History

2025-01-14 20:28:06 -07:00
#! /usr/bin/env python3
2025-01-14 18:26:40 -07:00
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
2025-01-14 20:27:03 -07:00
import requests
import json
import os
2025-01-14 18:26:40 -07:00
DBusGMainLoop(set_as_default=True)
objpath = "/krha"
2025-01-14 20:27:03 -07:00
os.environ['REQUESTS_CA_BUNDLE'] = '/etc/ssl/certs/ca-certificates.crt'
2025-01-14 18:26:40 -07:00
2025-01-14 20:27:03 -07:00
iface = "org.kde.krunner1"
2025-01-14 18:26:40 -07:00
class Runner(dbus.service.Object):
def __init__(self):
2025-01-14 20:38:15 -07:00
dbus.service.Object.__init__(self, dbus.service.BusName("dev.suah.krha", dbus.SessionBus()), objpath)
2025-01-14 20:27:03 -07:00
self.api_key = os.environ.get("HA_API_KEY", "")
self.ha_url = os.environ.get("HA_URL", "").rstrip('/')
2025-01-14 18:26:40 -07:00
@dbus.service.method(iface, in_signature='s', out_signature='a(sssida{sv})')
def Match(self, query: str):
2025-01-14 20:27:03 -07:00
if not query.startswith("ha "):
return []
2025-01-14 18:26:40 -07:00
2025-01-14 20:27:03 -07:00
command = query[3:]
if not command:
return [("ha", "Home Assistant Commands", "home", 100, 1.0,
{'subtext': 'Type a command after "ha" to control Home Assistant'})]
return [(
command, # data
f"Send to Home Assistant: {command}", # text
"home", # icon
100, # type
1.0, # relevance
{'subtext': 'Press Enter to send command'} # properties
)]
2025-01-14 18:26:40 -07:00
@dbus.service.method(iface, out_signature='a(sss)')
def Actions(self):
2025-01-14 20:27:03 -07:00
return [("send", "Send to Home Assistant", "home")]
2025-01-14 18:26:40 -07:00
@dbus.service.method(iface, in_signature='ss')
def Run(self, data: str, action_id: str):
print(data, action_id)
2025-01-14 20:27:03 -07:00
if not self.api_key or not self.ha_url:
print("Error: HA_API_KEY or HA_URL not configured")
return
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(
f"{self.ha_url}/api/conversation/process",
headers=headers,
json={"text": data}
)
response.raise_for_status()
result = response.json()
print(f"Response from Home Assistant: {json.dumps(result, indent=2)}")
except Exception as e:
print(f"Error sending command to Home Assistant: {e}")
2025-01-14 18:26:40 -07:00
runner = Runner()
loop = GLib.MainLoop()
loop.run()