1 module irc.eventloop;
2 
3 import irc.client;
4 import irc.dcc : DccConnection;
5 import irc.exception;
6 import irc.util : alloc, dealloc;
7 
8 import std.array;
9 import std.exception;
10 import std.socket;
11 
12 // libev is a world of pain on Windows - where it just
13 // uses select() internally anyway - but it pays off on
14 // other platforms.
15 import deimos.ev;
16 
17 version(Windows)
18 {
19 	import core.stdc.stdint : intptr_t;
20 
21 	extern(C) int _open_osfhandle(intptr_t osfhandle, int flags) nothrow;
22 
23 	// Dirk doesn't use this, just makes the linker happy
24 	extern(C) void* _stati64;
25 }
26 
27 private int getHandle(Socket socket)
28 {
29 	version(Windows)
30 	{
31 		auto handle = _open_osfhandle(socket.handle, 0);
32 		assert(handle != -1);
33 	}
34 	else
35 		auto handle = socket.handle;
36 
37 	return handle;
38 }
39 
40 //package alias IrcEventLoop.DccWatcher* DccEventIndex; // DMD bug?
41 package alias void* DccEventIndex;
42 
43 /**
44  * A collection of $(DPREF client, IrcClient) objects for efficiently handling incoming data.
45  */
46 class IrcEventLoop
47 {
48 	private:
49 	static struct Watcher
50 	{
51 		ev_io io; // Must be first.
52 		IrcEventLoop eventLoop;
53 	}
54 
55 	static struct DccWatcher
56 	{
57 		Watcher watcher;
58 		alias watcher this;
59 
60 		ev_timer timeoutTimer;
61 		ubyte[4] _padding; // emplace??
62 
63 		DccWatcher* _next, _prev;
64 	}
65 
66 	ev_loop_t* ev;
67 	Watcher[IrcClient] watchers;
68 	DccWatcher* dccWatchers;
69 
70 	ev_idle idleWatcher;
71 	void delegate()[] customMessages;
72 
73 	public:
74 	/**
75 	 * Create a new event loop.
76 	 */
77 	this()
78 	{
79 		this.ev = ev_loop_new(EVFLAG_AUTO);
80 	}
81 
82 	~this()
83 	{
84 		if(ev_is_active(&idleWatcher))
85 			ev_idle_stop(ev, &idleWatcher);
86 
87 		foreach(_, ref watcher; watchers)
88 			ev_io_stop(ev, &watcher.io);
89 
90 		ev_loop_destroy(ev);
91 
92 		for(auto watcher = dccWatchers; watcher != null;)
93 		{
94 			ev_io_stop(ev, &watcher.io);
95 
96 			auto next = watcher._next;
97 			dealloc(watcher);
98 			watcher = next;
99 		}
100 	}
101 
102 	private extern(C) static void callback(ev_loop_t* ev, ev_io* io, int revents)
103 	{
104 		auto client = cast(IrcClient)io.data;
105 		auto eventLoop = (cast(Watcher*)io).eventLoop;
106 
107 		bool wasClosed = true;
108 
109 		scope(exit)
110 		{
111 			if(wasClosed)
112 				eventLoop.remove(client);
113 		}
114 
115 		if(eventLoop.onError.empty) // Doesn't erase stacktrace this way
116 			wasClosed = client.read();
117 		else
118 		{
119 			try wasClosed = client.read();
120 			catch(Exception e)
121 			{
122 				foreach(handler; eventLoop.onError)
123 					handler(client, e);
124 			}
125 		}
126 	}
127 
128 	/**
129 	 * Invoked when an error occurs for a client
130 	 * in the set.
131 	 *
132 	 * If no handlers are registered,
133 	 * the error will be propagated out of
134 	 * $(MREF IrcEventLoop.run). The client
135 	 * will always be removed from the set.
136 	 * Throwing from a handler is allowed but
137 	 * will cause any subsequent registered handlers
138 	 * not to be called and the exception will
139 	 * keep propagating.
140 	 */
141 	void delegate(IrcClient, Exception)[] onError;
142 
143 	/**
144 	 * Add a connected _client to the set, or do nothing
145 	 * if the _client is already in the set.
146 	 *
147 	 * The _client is automatically removed
148 	 * if it is disconnected inside an event
149 	 * callback registered on the _client.
150 	 * If the _client is disconnected outside
151 	 * the event loop, it is the caller's
152 	 * responsibility to call $(MREF IrcEventLoop.remove).
153 	 * Params:
154 	 *   client = _client to _add
155 	 * Throws:
156 	 *   $(DPREF exception, UnconnectedClientException) if client is not connected.
157 	 */
158 	void add(IrcClient client)
159 	{
160 		enforceEx!UnconnectedClientException(
161 		    client.connected, "client to be added must be connected");
162 
163 		if(client in watchers)
164 			return;
165 
166 		watchers[client] = Watcher();
167 		auto watcher = client in watchers;
168 		watcher.io.data = cast(void*)client;
169 		watcher.eventLoop = this;
170 
171 		ev_io_init(&watcher.io, &callback, getHandle(client.socket), EV_READ);
172 		ev_io_start(ev, &watcher.io);
173 	}
174 
175 	/*
176 	 * DCC events
177 	 */
178 	private extern(C) static void dccCallback(ev_loop_t* ev, ev_io* io, int revents)
179 	{
180 		auto dcc = cast(DccConnection)io.data;
181 		auto watcher = cast(DccWatcher*)io;
182 		auto eventLoop = watcher.eventLoop;
183 
184 		DccConnection.Event dccEvent;
185 
186 		try dccEvent = dcc.read();
187 		catch(Exception e)
188 		{
189 			eventLoop.remove(dcc.eventIndex);
190 
191 			foreach(callback; dcc.onError)
192 				callback(e);
193 
194 			return;
195 		}
196 
197 		final switch(dccEvent) with(DccConnection.Event)
198 		{
199 			case none:
200 				break;
201 			case connectionEstablished: // dcc.socket should now contain client
202 				ev_timer_stop(ev, &watcher.timeoutTimer);
203 				ev_io_stop(ev, io);
204 				ev_io_set(io, getHandle(dcc.socket), EV_READ);
205 				ev_io_start(ev, io);
206 				break;
207 			case finished:
208 				eventLoop.remove(dcc.eventIndex);
209 				break;
210 		}
211 	}
212 
213 	private extern(C) static void dccTimeout(ev_loop_t* ev, ev_timer* timer, int revents)
214 	{
215 		auto dcc = cast(DccConnection)timer.data;
216 		auto watcher = cast(DccWatcher*)dcc.eventIndex;
217 		auto eventLoop = watcher.eventLoop;
218 
219 		scope(exit) eventLoop.remove(watcher);
220 
221 		dcc.doTimeout();
222 	}
223 
224 	package DccEventIndex add(DccConnection conn)
225 	{
226 		auto watcher = alloc!DccWatcher();
227 		watcher.io.data = cast(void*)conn;
228 		watcher.timeoutTimer.data = cast(void*)conn;
229 		watcher.eventLoop = this;
230 
231 		ev_io_init(&watcher.io, &dccCallback, getHandle(conn.socket), EV_READ);
232 		ev_io_start(ev, &watcher.io);
233 
234 		ev_timer_init(&watcher.timeoutTimer, &dccTimeout, conn.timeout, 0);
235 		ev_timer_start(ev, &watcher.timeoutTimer);
236 
237 		auto prevHead = dccWatchers;
238 		dccWatchers = watcher;
239 
240 		watcher._next = prevHead;
241 		if(prevHead) prevHead._prev = watcher;
242 
243 		return watcher;
244 	}
245 
246 	package void remove(DccEventIndex dccEvent)
247 	{
248 		DccWatcher* watcher = cast(DccWatcher*)dccEvent;
249 
250 		auto prev = watcher._prev;
251 		auto next = watcher._next;
252 
253 		ev_io_stop(ev, &watcher.io);
254 		dealloc(watcher);
255 
256 		if(prev) prev._next = next;
257 		if(next) next._prev = prev;
258 
259 		if(dccWatchers == watcher)
260 			dccWatchers = null;
261 	}
262 
263 	/**
264 	 * Remove a _client from the set, or do nothing if the _client is not in the set.
265 	 * Params:
266 	 *   client = _client to _remove
267 	 */
268 	void remove(IrcClient client)
269 	{
270 		if(auto watcher = client in watchers)
271 		{
272 			ev_io_stop(ev, &watcher.io);
273 			watchers.remove(client);
274 		}
275 	}
276 
277 	// Idle events
278 	private extern(C) static void onIdle(ev_loop_t* ev, ev_idle* watcher, int revents)
279 	{
280 		auto eventLoop = (cast(IrcEventLoop)watcher.data);
281 
282 		while(!eventLoop.customMessages.empty)
283 		{
284 			auto cb = eventLoop.customMessages.front;
285 			eventLoop.customMessages.popFront();
286 			cb();
287 		}
288 
289 		ev_idle_stop(ev, watcher);
290 	}
291 
292 	/**
293 	 * Run the specified callback at the next idle event.
294 	 */
295 	void post(void delegate() callback)
296 	{
297 		customMessages ~= callback;
298 
299 		auto watcher = &idleWatcher;
300 
301 		if(!ev_is_active(watcher))
302 		{
303 			watcher.data = cast(void*)this;
304 			ev_idle_init(watcher, &onIdle);
305 			ev_idle_start(ev, watcher);
306 		}
307 	}
308 
309 	private static struct CustomTimer
310 	{
311 		ev_timer timer;
312 		void delegate() callback;
313 	}
314 
315 	private extern(C) static void onCustomTimeout(ev_loop_t* ev, ev_timer* timer, int revents)
316 	{
317 		import core.memory : GC;
318 		auto customTimer = cast(CustomTimer*)timer;
319 
320 		//scope(exit) dealloc(customTimer);
321 
322 		if(customTimer.timer.repeat == 0)
323 			GC.removeRoot(timer);
324 
325 		customTimer.callback();
326 	}
327 
328 	enum TimerRepeat { yes, no }
329 
330 	struct Timer
331 	{
332 		private:
333 		IrcEventLoop eventLoop;
334 		CustomTimer* timer;
335 
336 		public:
337 		void stop()
338 		{
339 			enforce(active);
340 			ev_timer_stop(eventLoop.ev, &timer.timer);
341 			timer = null;
342 		}
343 
344 		bool active() @property
345 		{
346 			return timer !is null && ev_is_active(&timer.timer);
347 		}
348 
349 		TimerRepeat repeat() @property
350 		{
351 			enforce(active);
352 			with(TimerRepeat) return timer.timer.repeat == 0? no : yes;
353 		}
354 
355 		bool opCast(T)() if(is(T == bool))
356 		{
357 			return active;
358 		}
359 	}
360 
361 	/**
362 	* Run the specified callback as soon as possible after $(D time)
363 	* has elapsed.
364 	*
365 	* Equivalent to $(D postTimer(callback, time, TimerRepeat.no)).
366 	*/
367 	Timer post(void delegate() callback, double time)
368 	{
369 		return postTimer(callback, time, TimerRepeat.no);
370 	}
371 
372 	/**
373 	 * Run $(D callback) at every $(D interval), or just once after $(D interval)
374 	 * time has elapsed if $(D repeat) is $(D TimerRepeat.no).
375 	 */
376 	Timer postTimer(void delegate() callback, double interval, TimerRepeat repeat)
377 	{
378 		import core.memory : GC;
379 
380 		enforce(callback);
381 		enforce(interval >= 0);
382 
383 		//auto watcher = alloc!CustomTimer(); // TODO: use more efficient memory management
384 		auto watcher = new CustomTimer();
385 		watcher.callback = callback;
386 
387 		double repeatTime = repeat == TimerRepeat.yes? interval : 0.0;
388 
389 		ev_timer_init(&watcher.timer, &onCustomTimeout, interval, repeatTime);
390 		ev_timer_start(ev, &watcher.timer);
391 
392 		GC.addRoot(watcher);
393 
394 		return Timer(this, watcher);
395 	}
396 
397 	/**
398 	 * Handle incoming data for the clients in the set.
399 	 *
400 	 * The incoming data is handled by the respective client,
401 	 * and callbacks are called.
402 	 * Returns when all clients are no longer connected,
403 	 * or immediately if there are no clients in the set.
404 	 */
405 	void run()
406 	{
407 		ev_run(ev, 0);
408 	}
409 }