#!/usr/bin/python
#
# An applet to monitor /etc/nologin
import gio
import dbus
import dbus.mainloop.glib
import gtk
import gobject
import sys
import os
import subprocess
import time
from optparse import OptionParser
SM_DBUS_NAME = "org.gnome.SessionManager"
SM_DBUS_PATH = "/org/gnome/SessionManager"
SM_DBUS_INTERFACE = "org.gnome.SessionManager"
SM_CLIENT_DBUS_INTERFACE = "org.gnome.SessionManager.ClientPrivate"
APP_ID = "debathena-nologin-monitor"
class GDMSucks:
def __init__(self, options):
self.debug = options.debug
self.guitest = options.guitest
self.sessionEnding = False
self.sessionBus = dbus.SessionBus()
try:
self.register_with_sm()
self.init_sm_client()
except:
print "Warning: Cannot register with session manager."
self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
self.window.set_property('can-focus', False)
box = gtk.VBox()
self.label = gtk.Label()
self.label.set_markup('Software updates are being applied.\n\nThis workstation is temporarily unavailable.\n\nPlease use another workstation.\n\n(Update started at %s)' % (time.strftime("%Y-%m-%d %H:%M")))
self.label.set_property('can-focus', False)
self.label.set_justify(gtk.JUSTIFY_CENTER)
box.pack_start(self.label, True, True, 5)
self.window.add(box)
self.window.set_size_request(800, 600)
self.window.set_decorated(False)
self.window.set_position(gtk.WIN_POS_CENTER)
self.window.hide()
if self.guitest:
self.window.show_all()
try:
metapackage = subprocess.Popen(["machtype", "-L"], stdout=subprocess.PIPE).communicate()[0].rstrip()
except OSError:
# Assume cluster
metapackage = 'debathena-cluster'
if (self.debug or metapackage == 'debathena-cluster') and os.path.isfile("/var/run/athena-nologin"):
self.window.show_all()
if self.debug or metapackage == 'debathena-cluster':
self.gfile = gio.File("/var/run/athena-nologin")
self.monitor = self.gfile.monitor_file(gio.FILE_MONITOR_NONE, None)
self.monitor.connect("changed", self.directory_changed)
def directory_changed(self, monitor, file1, file2, evt_type):
if self.debug:
print str(evt_type), file1.get_path()
if evt_type == gio.FILE_MONITOR_EVENT_CREATED:
self.label.set_markup('Software updates are being applied.\n\nThis workstation is temporarily unavailable.\n\nPlease use another workstation.\n\n(Update started at %s)' % (time.strftime("%Y-%m-%d %H:%M")))
self.window.show_all()
if evt_type == gio.FILE_MONITOR_EVENT_DELETED:
self.window.hide()
# Connect to the session manager, and register our client.
def register_with_sm(self):
proxy = self.sessionBus.get_object(SM_DBUS_NAME, SM_DBUS_PATH)
sm = dbus.Interface(proxy, SM_DBUS_INTERFACE)
autostart_id = os.getenv("DESKTOP_AUTOSTART_ID", default="")
self.smClientId = sm.RegisterClient(APP_ID, autostart_id)
# Set up to handle signals from the session manager.
def init_sm_client(self):
proxy = self.sessionBus.get_object(SM_DBUS_NAME, self.smClientId)
self.smClient = dbus.Interface(proxy, SM_CLIENT_DBUS_INTERFACE)
self.smClient.connect_to_signal("QueryEndSession",
self.sm_on_QueryEndSession)
self.smClient.connect_to_signal("EndSession", self.sm_on_EndSession)
self.smClient.connect_to_signal("CancelEndSession",
self.sm_on_CancelEndSession)
self.smClient.connect_to_signal("Stop", self.sm_on_Stop)
# Here on a QueryEndSession signal from the session manager.
def sm_on_QueryEndSession(self, flags):
self.sessionEnding = True
# Response args: is_ok, reason.
self.smClient.EndSessionResponse(True, "")
# Here on an EndSession signal from the session manager.
def sm_on_EndSession(self, flags):
self.sessionEnding = True
# Response args: is_ok, reason.
self.smClient.EndSessionResponse(True, "")
# Here on a CancelEndSession signal from the session manager.
def sm_on_CancelEndSession(self):
self.sessionEnding = False
# Here on a Stop signal from the session manager.
def sm_on_Stop(self):
gtk.main_quit()
def main(options):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
GDMSucks(options)
gtk.main()
if __name__ == '__main__':
parser = OptionParser()
parser.set_defaults(debug=False, guitest=False)
parser.add_option("--test", action="store_true", dest="debug")
parser.add_option("--test-gui", action="store_true", dest="guitest")
(options, args) = parser.parse_args()
main(options)