Package dogtail :: Module sessions
[hide private]
[frames] | no frames]

Source Code for Module dogtail.sessions

  1  import time 
  2  import os 
  3  import pwd 
  4  import errno 
  5  import re 
  6  import subprocess 
  7  import signal 
  8  import tempfile 
  9  import random 
 10  import glob 
 11  from dogtail.config import config 
12 13 14 -def scratchFile(label): # pragma: no cover
15 """Uses tempfile.NamedTemporaryFile() to create a unique tempfile in 16 config.scratchDir, with a filename like: 17 dogtail-headless-<label>.<random junk>""" 18 prefix = "dogtail-headless-" 19 return tempfile.NamedTemporaryFile(prefix="%s%s." % (prefix, label), 20 dir=config.scratchDir) 21
22 23 -def testBinary(path): # pragma: no cover
24 if (path.startswith(os.path.sep) or 25 path.startswith(os.path.join('.', '')) or 26 path.startswith(os.path.join('..', ''))): 27 if not os.path.exists(path): 28 raise IOError(errno.ENOENT, "No such file", path) 29 if not os.access(path, os.X_OK): 30 raise IOError(errno.ENOEXEC, "Permission denied", path) 31 return True 32
33 34 -def get_username(): # pragma: no cover
35 return pwd.getpwuid(os.getuid())[0] 36
37 38 -class Subprocess(object): # pragma: no cover
39
40 - def __init__(self, cmdList, environ=None):
41 testBinary(cmdList[0]) 42 self.cmdList = cmdList 43 self.environ = environ 44 self._exitCode = None
45
46 - def start(self):
47 if self.environ is None: 48 self.environ = os.environ 49 self.popen = subprocess.Popen( 50 self.cmdList, env=self.environ) # , stdout = subprocess.PIPE, 51 # stderr = subprocess.STDOUT, close_fds = True) 52 return self.popen.pid
53
54 - def wait(self):
55 return self.popen.wait()
56
57 - def stop(self):
58 # The following doesn't exist in python < 2.6, if you can believe it. 59 # self.popen.terminate() 60 os.kill(self.popen.pid, signal.SIGTERM)
61 62 @property
63 - def exitCode(self):
64 if self._exitCode is None: 65 self._exitCode = self.wait() 66 return self._exitCode
67
68 69 -class XServer(Subprocess): # pragma: no cover
70
71 - def __init__(self, server="/usr/bin/Xorg", 72 xinitrc="/etc/X11/xinit/Xclients", 73 resolution="1024x768x16"):
74 """resolution is only used with Xvfb.""" 75 testBinary(server) 76 self.server = server 77 self._exitCode = None 78 self.xinit = "/usr/bin/xinit" 79 self.display = None 80 self.xinitrc = xinitrc 81 self.resolution = resolution
82 83 @staticmethod
84 - def findFreeDisplay():
85 tmp = os.listdir('/tmp') 86 pattern = re.compile('\.X([0-9]+)-lock') 87 usedDisplays = [] 88 for file in tmp: 89 match = re.match(pattern, file) 90 if match: 91 usedDisplays.append(int(match.groups()[0])) 92 if not usedDisplays: 93 return ':0' 94 usedDisplays.sort() 95 return ':' + str(usedDisplays[-1] + 1)
96 97 @property
98 - def cmdList(self):
99 self.display = self.findFreeDisplay() 100 cmd = [] 101 if self.xinit: 102 cmd.append(self.xinit) 103 if self.xinitrc: 104 cmd.append(self.xinitrc) 105 cmd.append('--') 106 cmd.append(self.server) 107 cmd.append(self.display) 108 cmd.extend(['-ac', '-noreset']) 109 if self.server.endswith('Xvfb'): 110 cmd.extend(['-screen', '0', self.resolution]) 111 cmd.append('-shmem') 112 return cmd
113
114 - def start(self):
115 print(' '.join(self.cmdList)) 116 self.popen = subprocess.Popen(self.cmdList) 117 return self.popen.pid
118
119 120 -class Script(Subprocess): # pragma: no cover
121 pass 122
123 124 -class Session(object): # pragma: no cover
125 126 cookieName = "DOGTAIL_SESSION_COOKIE" 127
128 - def __init__(self, sessionBinary, scriptCmdList=[], scriptDelay=20, logout=True):
129 testBinary(sessionBinary) 130 self.sessionBinary = sessionBinary 131 self.script = Script(scriptCmdList) 132 self.scriptDelay = scriptDelay 133 self.logout = logout 134 self.xserver = XServer() 135 self._cookie = None 136 self._environment = None
137
138 - def start(self):
139 self.xinitrcFileObj = scratchFile('xinitrc') 140 self.xserver.xinitrc = self.xinitrcFileObj.name 141 self._buildXInitRC(self.xinitrcFileObj) 142 xServerPid = self.xserver.start() 143 time.sleep(self.scriptDelay) 144 self.script.environ = self.environment 145 scriptPid = self.script.start() 146 return (xServerPid, scriptPid)
147 148 @property
149 - def environment(self):
150 def isSessionProcess(fileName): 151 try: 152 if os.path.realpath(path + 'exe') != ('/usr/bin/plasma-desktop' 153 if self.sessionBinary.split('/')[-1] == 'startkde' 154 else self.sessionBinary): 155 return False 156 except OSError: 157 return False 158 pid = fileName.split('/')[2] 159 if pid == 'self' or pid == str(os.getpid()): 160 return False 161 return True
162 163 def getEnvDict(fileName): 164 try: 165 envString = open(fileName, 'r').read() 166 except IOError: 167 return {} 168 envItems = envString.split('\x00') 169 envDict = {} 170 for item in envItems: 171 if not '=' in item: 172 continue 173 k, v = item.split('=', 1) 174 envDict[k] = v 175 return envDict 176 177 def isSessionEnv(envDict): 178 if not envDict: 179 return False 180 if envDict.get(self.cookieName, 'notacookie') == self.cookie: 181 return True 182 return False 183 184 for path in glob.glob('/proc/*/'): 185 if not isSessionProcess(path): 186 continue 187 envFile = path + 'environ' 188 envDict = getEnvDict(envFile) 189 if isSessionEnv(envDict): 190 # print path 191 # print envDict 192 self._environment = envDict 193 if not self._environment: 194 raise RuntimeError("Can't find our environment!") 195 return self._environment 196
197 - def wait(self):
198 self.script.wait() 199 return self.xserver.wait()
200
201 - def stop(self):
202 try: 203 self.script.stop() 204 except OSError: 205 pass 206 self.xserver.stop()
207
208 - def attemptLogout(self):
209 logoutScript = Script('dogtail-logout', 210 environ=self.environment) 211 logoutScript.start() 212 logoutScript.wait()
213 214 @property
215 - def cookie(self):
216 if not self._cookie: 217 self._cookie = "%X" % random.getrandbits(16) 218 return self._cookie
219
220 - def _buildXInitRC(self, fileObj):
221 lines = [ 222 "export %s=%s" % (self.cookieName, self.cookie), 223 "gsettings set org.gnome.desktop.interface toolkit-accessibility true", 224 ". /etc/X11/xinit/xinitrc-common", 225 "export %s" % self.cookieName, 226 "exec -l $SHELL -c \"$CK_XINIT_SESSION $SSH_AGENT %s\"" % 227 (self.sessionBinary), 228 ""] 229 230 fileObj.write('\n'.join(lines).strip()) 231 fileObj.flush()
232