source: mergebot/trunk/mergebot/mergebotdaemon.py @ 18

Last change on this file since 18 was 18, checked in by retracile, 16 years ago

Mergebot: A merge operation targeting our version blocks us

File size: 13.8 KB
Line 
1#!/usr/bin/python
2"""Daemon that performs mergebot operations and manages the associated work.
3"""
4
5import sys
6import socket
7import select
8import os
9import time
10import trac
11import base64
12
13from threading import Thread
14
15from mergebot.BranchActor import BranchActor
16from mergebot.RebranchActor import RebranchActor
17from mergebot.CheckMergeActor import CheckMergeActor
18from mergebot.MergeActor import MergeActor
19
20from mergebot.daemonize import daemonize
21
22
23class Task(object):
24    """Represents a task in the queue with its dependencies.
25    """
26    task_id = 0
27    task_name_to_actor = {
28        'merge': MergeActor,
29        'branch': BranchActor,
30        'rebranch': RebranchActor,
31        'checkmerge': CheckMergeActor,
32    }
33    def __init__(self, master, ticket, action, component, version, summary,
34                 requestor):
35        self.id = Task.task_id = Task.task_id + 1
36        self.master = master
37        self.action = action.lower()
38        if action not in Task.task_name_to_actor.keys():
39            raise Exception('Invalid task type %s' % action)
40        self.ticket = ticket
41        self.component = component
42        self.version = version
43        self.summary = summary
44        self.requestor = requestor
45        self.blocked_by = []
46        self._queued = False
47        self._running = False
48        self._completed = False
49
50    def find_blockers(self, tasks):
51        """Adds existing tasks that block this task to this task's blocked_by
52        list.
53        """
54        for task in tasks:
55            # Tasks for different components are completely independent
56            if task.component == self.component:
57                is_blocked = False
58                # if self.version is a ticket ID, then any action other than
59                # 'checkmerge' on the parent ticket blocks this action
60                if self.version.startswith('#'):
61                    parent_ticket = int(self.version.lstrip('#'))
62                    if task.ticket == parent_ticket and task.action != 'checkmerge':
63                        is_blocked = True
64                # Any action other than checkmerge on a child ticket of ours blocks us
65                if task.version.startswith('#'):
66                    parent_ticket = int(task.version.lstrip('#'))
67                    if self.ticket == parent_ticket and task.action != 'checkmerge':
68                        is_blocked = True
69                # If (re)branching, then blocked by other tickets that are merging _to_ self.version
70                if self.action == 'branch' or self.action == 'rebranch':
71                    if task.action == 'merge' and task.version == self.version:
72                        is_blocked = True
73                # A merge operation targeting our version blocks us
74                if task.action == 'merge' and task.version == self.version:
75                    is_blocked = True
76                # If there is another queued operation for this same ticket,
77                # that task blocks this one
78                if self.ticket == task.ticket:
79                    is_blocked = True
80
81                if is_blocked:
82                    self.blocked_by.append(task)
83        return len(self.blocked_by)
84
85    def other_task_completed(self, task):
86        """Remove the given task from this task's blocked_by list.
87        """
88        if task in self.blocked_by:
89            self.blocked_by.remove(task)
90        return len(self.blocked_by)
91
92    def queued(self):
93        """Mark this task ask queued to run.
94        """
95        self._queued = True
96
97    def started(self):
98        """Mark this task as running/started.
99        """
100        self._running = True
101
102    def completed(self):
103        """Mark this task as completed/zombie.
104        """
105        self._completed = True
106
107    def get_state(self):
108        """Return a single-character indicator of this task's current state.
109
110        Tasks are:
111            Pending if they are ready to run.
112            Waiting if they are blocked by another task.
113            Running if they are currently being done.
114            Zombie if they have been completed.
115        """
116        if self._completed:
117            state = 'Z'#ombie
118        elif self._running:
119            state = 'R'#unning
120        elif self._queued:
121            state = 'Q'#ueued
122        elif self.blocked_by:
123            state = 'W'#aiting
124        else:
125            state = 'P'#ending
126        return state
127
128    def execute(self):
129        """Performs the actions for this task.
130        """
131        work_dir = os.path.join(self.master.work_dir, 'worker-%s' % self.id)
132        actor = Task.task_name_to_actor[self.action](work_dir,
133            self.master.repo_url, self.master.repo_dir, self.ticket,
134            self.component, self.version, self.summary, self.requestor)
135        self.results, self.result_comment, self.success = actor.execute()
136       
137    def __str__(self):
138        summary = base64.b64encode(self.summary)
139        return ','.join([str(e) for e in (self.id, self.get_state(),
140            self.ticket, self.action, self.component, self.version,
141            self.requestor, summary)])
142
143
144class Worker(object):
145    """Thread to do the work for an operation; has a work area it is
146    responsible for.
147    """
148    def __init__(self, num, work_dir):
149        self.number = num
150        self.work_dir = work_dir
151        self.task = None
152        self.inbox_read, self.inbox_write = os.pipe()
153        self.notifier_read, self.notifier_write = os.pipe()
154        self.thread = Thread(target=self.work)
155        self.thread.setDaemon(True)
156        self.thread.start()
157
158    def queue(self, task):
159        task.queued()
160        self.task = task
161        os.write(self.inbox_write, 'q')
162
163    def _dequeue(self):
164        os.read(self.inbox_read, 1)
165
166    def _completed(self):
167        self.task.completed()
168        os.write(self.notifier_write, 'd')
169
170    def ack_complete(self):
171        os.read(self.notifier_read, 1)
172        return self.task
173
174    def notifier(self):
175        return self.notifier_read
176
177    def work(self):
178        while True:
179            # get a task -- blocking read on pipe?
180            self._dequeue()
181            # do the task
182            log_filename = os.path.join(self.work_dir, 'worker.%s' % self.number)
183            open(log_filename, 'a').write(str(self.task) + ' started %s\n' % time.time())
184            self.task.started()
185            self.task.execute()
186            open(log_filename, 'a').write(str(self.task) + ' completed %s\n' % time.time())
187            # notify master of completion -- write to pipe?
188            self._completed()
189
190
191class Mergebot(object):
192    # Maybe I should just pass this the trac environment dir and have it create
193    # an environment, then pull config info from that.
194    def __init__(self, trac_dir):
195        self.trac_dir = os.path.abspath(trac_dir)
196        self.trac_env = trac.env.open_environment(self.trac_dir)
197        config = self.trac_env.config
198        self.listen_on = (config.get('mergebot', 'listen.ip'),
199                          config.getint('mergebot', 'listen.port'))
200        self.work_dir = config.get('mergebot', 'work_dir')
201        if not os.path.isabs(self.work_dir):
202            self.work_dir = os.path.join(self.trac_dir, self.work_dir)
203        self.repo_url = config.get('mergebot', 'repository_url')
204        repo_dir = config.get('trac', 'repository_dir')
205        if not os.path.isabs(repo_dir):
206            repo_dir = os.path.join(self.trac_dir, repo_dir)
207        self.repo_dir = repo_dir
208
209        self.listening = None
210        self.worker_count = config.getint('mergebot', 'worker_count')
211        self.task_list = []
212
213    def run(self):
214        """Run forever, handling requests.
215        """
216        self.listening = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
217        self.listening.bind(self.listen_on)
218        self.listening.listen(10)
219        open_sockets = []
220        workers = [Worker(i, work_dir=self.work_dir) for i in range(self.worker_count)]
221        active = []
222        try:
223            while True:
224                fds = [self.listening] + open_sockets + [w.notifier() for w in
225                                                         active]
226                readable, _writeable, _other = select.select(fds, [], [])
227                for s in readable:
228                    if s is self.listening:
229                        new_socket, _address = self.listening.accept()
230                        open_sockets.append(new_socket)
231                    elif s in open_sockets:
232                        data = s.recv(4096)
233                        for line in data.rstrip('\n').split('\n'):
234                            if line == '':
235                                open_sockets.remove(s)
236                                s.close()
237                            elif line[:4].upper() == 'QUIT':
238                                open_sockets.remove(s)
239                                s.close()
240                            else:
241                                response = self.handle_command(line)
242                                if response:
243                                    s.send(response)
244                                # I think we're going to want to make this a
245                                # single-shot deal
246                                #s.close()
247                                #open_sockets.remove(s)
248                    else:
249                        # Must be an active worker telling us it is done
250                        worker = [w for w in active if w.notifier() == s][0]
251                        task = worker.ack_complete()
252                        active.remove(worker)
253                        workers.insert(0, worker)
254                        # On failure, cancel all tasks that depend on this one.
255                        if not task.success:
256                            self._remove_dependant_tasks(task)
257                        self._remove_task(task)
258                        self._update_ticket(task)
259                # TODO: need to handle connections that the other end
260                # terminates?
261
262                # Assign a task to a worker
263                available_workers = list(workers)
264                pending = [t for t in self.task_list if t.get_state() == 'P']
265                for w, t in zip(available_workers, pending):
266                    w.queue(t)
267                    workers.remove(w)
268                    active.append(w)
269
270        except KeyboardInterrupt:
271            print 'Exiting due to keyboard interrupt.'
272        except Exception, e:
273            print 'Exiting due to: ', e
274            raise
275        self.listening.close()
276
277    def handle_command(self, command):
278        """Takes input from clients, and calls the appropriate sub-command.
279        """
280        parts = command.strip().split()
281        if not parts:
282            return '\n'
283        command = parts[0].upper()
284        args = parts[1:]
285
286        response = 'unrecognized command "%s"' % command
287        if command == 'LIST':
288            response = self.command_list()
289        elif command == 'ADD':
290            response = self.command_add(args)
291        elif command == 'CANCEL':
292            response = self.command_cancel(args)
293        # etc...
294        return response + '\n'
295
296    def command_list(self):
297        """Returns a listing of all active tasks.
298        """
299        listing = []
300        for task in self.task_list:
301            listing.append(str(task))
302        return '\n'.join(listing) + '\nOK'
303
304    def command_add(self, args):
305        # create a new task object and add it to the pool
306        try:
307            ticket, action, component, version, requestor = args
308        except ValueError:
309            return 'Error: wrong number of args: add <ticket> <action> ' \
310                '<component> <version> <requestor>\nGot: %r' % args
311        try:
312            ticket = int(ticket.strip('#'))
313        except ValueError:
314            return 'Error: invalid ticket number "%s"' % ticket
315
316        trac_ticket = trac.ticket.Ticket(self.trac_env, ticket)       
317        summary = trac_ticket['summary']
318        new_task = Task(self, ticket, action, component, version, summary,
319                        requestor)
320        new_task.find_blockers(self.task_list)
321        self.task_list.append(new_task)
322        # and trigger the worker threads if needed
323        return 'OK'
324
325    def command_cancel(self, args):
326        try:
327            tasknum = int(args[0])
328        except ValueError:
329            return 'Error'
330        except IndexError:
331            return 'Error'
332        found = [t for t in self.task_list if t.id == tasknum]
333        if len(found) != 1:
334            return 'Error: Not found'
335        dead = found[0]
336        dead_state = dead.get_state()
337        if dead_state not in ['W', 'P']:
338            return 'Error: Cannot kill task (state %s)' % dead_state
339        self._remove_dependant_tasks(dead)
340        self._remove_task(dead)
341        return 'OK'
342
343    def _remove_task(self, dead):
344        """Removes the given task from the queue.
345        Removes task as a dependency from any other tasks in the queue.
346        Assumes the task is in the task_list.
347        """
348        try:
349            self.task_list.remove(dead)
350        except ValueError:
351            self.trac_env.log.error("Task %s not in task_list when asked to remove" % dead)
352        for t in self.task_list:
353            if dead in t.blocked_by:
354                t.blocked_by.remove(dead)
355
356    def _remove_dependant_tasks(self, task):
357        for t in self.task_list:
358            if task in t.blocked_by:
359                self._remove_dependant_tasks(t)
360                self._remove_task(t)
361
362    def _update_ticket(self, task):
363        ticket = trac.ticket.Ticket(self.trac_env, task.ticket)       
364        if task.results or task.result_comment:
365            for key, value in task.results.items():
366                ticket[key] = value
367            ticket.save_changes('mergebot', task.result_comment)
368
369
370def main(args):
371    foreground = False
372    if args[0] == '-f':
373        foreground = True
374        args = args[1:]
375    trac_dir = args[0]
376    if not foreground:
377        daemonize()
378    bot = Mergebot(trac_dir)
379    bot.run()
380
381def run():
382    main(sys.argv[1:])
383
384if __name__ == '__main__':
385    run()
Note: See TracBrowser for help on using the repository browser.