#!/usr/bin/python # # An applet to monitor /etc/nologin import gio import dbus import dbus.mainloop.glib import gtk import gobject import sys import os import subprocess import time from optparse import OptionParser SM_DBUS_NAME = "org.gnome.SessionManager" SM_DBUS_PATH = "/org/gnome/SessionManager" SM_DBUS_INTERFACE = "org.gnome.SessionManager" SM_CLIENT_DBUS_INTERFACE = "org.gnome.SessionManager.ClientPrivate" APP_ID = "debathena-nologin-monitor" class GDMSucks: def __init__(self, options): self.debug = options.debug self.guitest = options.guitest self.sessionEnding = False self.sessionBus = dbus.SessionBus() try: self.register_with_sm() self.init_sm_client() except: print "Warning: Cannot register with session manager." self.window = gtk.Window(gtk.WINDOW_TOPLEVEL) self.window.set_property('can-focus', False) box = gtk.VBox() self.label = gtk.Label() self.label.set_markup('Software updates are being applied.\n\nThis workstation is temporarily unavailable.\n\nPlease use another workstation.\n\n(Update started at %s)' % (time.strftime("%Y-%m-%d %H:%M"))) self.label.set_property('can-focus', False) self.label.set_justify(gtk.JUSTIFY_CENTER) box.pack_start(self.label, True, True, 5) self.window.add(box) self.window.set_size_request(800, 600) self.window.set_decorated(False) self.window.set_position(gtk.WIN_POS_CENTER) self.window.hide() if self.guitest: self.window.show_all() try: metapackage = subprocess.Popen(["machtype", "-L"], stdout=subprocess.PIPE).communicate()[0].rstrip() except OSError: # Assume cluster metapackage = 'debathena-cluster' if (self.debug or metapackage == 'debathena-cluster') and os.path.isfile("/var/run/athena-nologin"): self.window.show_all() if self.debug or metapackage == 'debathena-cluster': self.gfile = gio.File("/var/run/athena-nologin") self.monitor = self.gfile.monitor_file(gio.FILE_MONITOR_NONE, None) self.monitor.connect("changed", self.directory_changed) def directory_changed(self, monitor, file1, file2, evt_type): if self.debug: print str(evt_type), file1.get_path() if evt_type == gio.FILE_MONITOR_EVENT_CREATED: self.label.set_markup('Software updates are being applied.\n\nThis workstation is temporarily unavailable.\n\nPlease use another workstation.\n\n(Update started at %s)' % (time.strftime("%Y-%m-%d %H:%M"))) self.window.show_all() if evt_type == gio.FILE_MONITOR_EVENT_DELETED: self.window.hide() # Connect to the session manager, and register our client. def register_with_sm(self): proxy = self.sessionBus.get_object(SM_DBUS_NAME, SM_DBUS_PATH) sm = dbus.Interface(proxy, SM_DBUS_INTERFACE) autostart_id = os.getenv("DESKTOP_AUTOSTART_ID", default="") self.smClientId = sm.RegisterClient(APP_ID, autostart_id) # Set up to handle signals from the session manager. def init_sm_client(self): proxy = self.sessionBus.get_object(SM_DBUS_NAME, self.smClientId) self.smClient = dbus.Interface(proxy, SM_CLIENT_DBUS_INTERFACE) self.smClient.connect_to_signal("QueryEndSession", self.sm_on_QueryEndSession) self.smClient.connect_to_signal("EndSession", self.sm_on_EndSession) self.smClient.connect_to_signal("CancelEndSession", self.sm_on_CancelEndSession) self.smClient.connect_to_signal("Stop", self.sm_on_Stop) # Here on a QueryEndSession signal from the session manager. def sm_on_QueryEndSession(self, flags): self.sessionEnding = True # Response args: is_ok, reason. self.smClient.EndSessionResponse(True, "") # Here on an EndSession signal from the session manager. def sm_on_EndSession(self, flags): self.sessionEnding = True # Response args: is_ok, reason. self.smClient.EndSessionResponse(True, "") # Here on a CancelEndSession signal from the session manager. def sm_on_CancelEndSession(self): self.sessionEnding = False # Here on a Stop signal from the session manager. def sm_on_Stop(self): gtk.main_quit() def main(options): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) GDMSucks(options) gtk.main() if __name__ == '__main__': parser = OptionParser() parser.set_defaults(debug=False, guitest=False) parser.add_option("--test", action="store_true", dest="debug") parser.add_option("--test-gui", action="store_true", dest="guitest") (options, args) = parser.parse_args() main(options)