1 module irc.eventloop; 2 3 import irc.client; 4 import irc.dcc : DccConnection; 5 import irc.exception; 6 import irc.util : alloc, dealloc; 7 8 import std.array; 9 import std.exception; 10 import std.socket; 11 12 // libev is a world of pain on Windows - where it just 13 // uses select() internally anyway - but it pays off on 14 // other platforms. 15 import deimos.ev; 16 17 version(Windows) 18 { 19 import core.stdc.stdint : intptr_t; 20 21 extern(C) int _open_osfhandle(intptr_t osfhandle, int flags) nothrow; 22 23 // Dirk doesn't use this, just makes the linker happy 24 extern(C) void* _stati64; 25 } 26 27 private int getHandle(Socket socket) 28 { 29 version(Windows) 30 { 31 auto handle = _open_osfhandle(socket.handle, 0); 32 assert(handle != -1); 33 } 34 else 35 auto handle = socket.handle; 36 37 return handle; 38 } 39 40 //package alias IrcEventLoop.DccWatcher* DccEventIndex; // DMD bug? 41 package alias void* DccEventIndex; 42 43 /** 44 * A collection of $(DPREF client, IrcClient) objects for efficiently handling incoming data. 45 */ 46 class IrcEventLoop 47 { 48 private: 49 static struct Watcher 50 { 51 ev_io io; // Must be first. 52 IrcEventLoop eventLoop; 53 } 54 55 static struct DccWatcher 56 { 57 Watcher watcher; 58 alias watcher this; 59 60 ev_timer timeoutTimer; 61 ubyte[4] _padding; // emplace?? 62 63 DccWatcher* _next, _prev; 64 } 65 66 ev_loop_t* ev; 67 Watcher[IrcClient] watchers; 68 DccWatcher* dccWatchers; 69 70 ev_idle idleWatcher; 71 void delegate()[] customMessages; 72 73 public: 74 /** 75 * Create a new event loop. 76 */ 77 this() 78 { 79 this.ev = ev_loop_new(EVFLAG_AUTO); 80 } 81 82 ~this() 83 { 84 if(ev_is_active(&idleWatcher)) 85 ev_idle_stop(ev, &idleWatcher); 86 87 foreach(_, ref watcher; watchers) 88 ev_io_stop(ev, &watcher.io); 89 90 ev_loop_destroy(ev); 91 92 for(auto watcher = dccWatchers; watcher != null;) 93 { 94 ev_io_stop(ev, &watcher.io); 95 96 auto next = watcher._next; 97 dealloc(watcher); 98 watcher = next; 99 } 100 } 101 102 private extern(C) static void callback(ev_loop_t* ev, ev_io* io, int revents) 103 { 104 auto client = cast(IrcClient)io.data; 105 auto eventLoop = (cast(Watcher*)io).eventLoop; 106 107 bool wasClosed = true; 108 109 scope(exit) 110 { 111 if(wasClosed) 112 eventLoop.remove(client); 113 } 114 115 if(eventLoop.onError.empty) // Doesn't erase stacktrace this way 116 wasClosed = client.read(); 117 else 118 { 119 try wasClosed = client.read(); 120 catch(Exception e) 121 { 122 foreach(handler; eventLoop.onError) 123 handler(client, e); 124 } 125 } 126 } 127 128 /** 129 * Invoked when an error occurs for a client 130 * in the set. 131 * 132 * If no handlers are registered, 133 * the error will be propagated out of 134 * $(MREF IrcEventLoop.run). The client 135 * will always be removed from the set. 136 * Throwing from a handler is allowed but 137 * will cause any subsequent registered handlers 138 * not to be called and the exception will 139 * keep propagating. 140 */ 141 void delegate(IrcClient, Exception)[] onError; 142 143 /** 144 * Add a connected _client to the set, or do nothing 145 * if the _client is already in the set. 146 * 147 * The _client is automatically removed 148 * if it is disconnected inside an event 149 * callback registered on the _client. 150 * If the _client is disconnected outside 151 * the event loop, it is the caller's 152 * responsibility to call $(MREF IrcEventLoop.remove). 153 * Params: 154 * client = _client to _add 155 * Throws: 156 * $(DPREF exception, UnconnectedClientException) if client is not connected. 157 */ 158 void add(IrcClient client) 159 { 160 enforceEx!UnconnectedClientException( 161 client.connected, "client to be added must be connected"); 162 163 if(client in watchers) 164 return; 165 166 watchers[client] = Watcher(); 167 auto watcher = client in watchers; 168 watcher.io.data = cast(void*)client; 169 watcher.eventLoop = this; 170 171 ev_io_init(&watcher.io, &callback, getHandle(client.socket), EV_READ); 172 ev_io_start(ev, &watcher.io); 173 } 174 175 /* 176 * DCC events 177 */ 178 private extern(C) static void dccCallback(ev_loop_t* ev, ev_io* io, int revents) 179 { 180 auto dcc = cast(DccConnection)io.data; 181 auto watcher = cast(DccWatcher*)io; 182 auto eventLoop = watcher.eventLoop; 183 184 DccConnection.Event dccEvent; 185 186 try dccEvent = dcc.read(); 187 catch(Exception e) 188 { 189 eventLoop.remove(dcc.eventIndex); 190 191 foreach(callback; dcc.onError) 192 callback(e); 193 194 return; 195 } 196 197 final switch(dccEvent) with(DccConnection.Event) 198 { 199 case none: 200 break; 201 case connectionEstablished: // dcc.socket should now contain client 202 ev_timer_stop(ev, &watcher.timeoutTimer); 203 ev_io_stop(ev, io); 204 ev_io_set(io, getHandle(dcc.socket), EV_READ); 205 ev_io_start(ev, io); 206 break; 207 case finished: 208 eventLoop.remove(dcc.eventIndex); 209 break; 210 } 211 } 212 213 private extern(C) static void dccTimeout(ev_loop_t* ev, ev_timer* timer, int revents) 214 { 215 auto dcc = cast(DccConnection)timer.data; 216 auto watcher = cast(DccWatcher*)dcc.eventIndex; 217 auto eventLoop = watcher.eventLoop; 218 219 scope(exit) eventLoop.remove(watcher); 220 221 dcc.doTimeout(); 222 } 223 224 package DccEventIndex add(DccConnection conn) 225 { 226 auto watcher = alloc!DccWatcher(); 227 watcher.io.data = cast(void*)conn; 228 watcher.timeoutTimer.data = cast(void*)conn; 229 watcher.eventLoop = this; 230 231 ev_io_init(&watcher.io, &dccCallback, getHandle(conn.socket), EV_READ); 232 ev_io_start(ev, &watcher.io); 233 234 ev_timer_init(&watcher.timeoutTimer, &dccTimeout, conn.timeout, 0); 235 ev_timer_start(ev, &watcher.timeoutTimer); 236 237 auto prevHead = dccWatchers; 238 dccWatchers = watcher; 239 240 watcher._next = prevHead; 241 if(prevHead) prevHead._prev = watcher; 242 243 return watcher; 244 } 245 246 package void remove(DccEventIndex dccEvent) 247 { 248 DccWatcher* watcher = cast(DccWatcher*)dccEvent; 249 250 auto prev = watcher._prev; 251 auto next = watcher._next; 252 253 ev_io_stop(ev, &watcher.io); 254 dealloc(watcher); 255 256 if(prev) prev._next = next; 257 if(next) next._prev = prev; 258 259 if(dccWatchers == watcher) 260 dccWatchers = null; 261 } 262 263 /** 264 * Remove a _client from the set, or do nothing if the _client is not in the set. 265 * Params: 266 * client = _client to _remove 267 */ 268 void remove(IrcClient client) 269 { 270 if(auto watcher = client in watchers) 271 { 272 ev_io_stop(ev, &watcher.io); 273 watchers.remove(client); 274 } 275 } 276 277 // Idle events 278 private extern(C) static void onIdle(ev_loop_t* ev, ev_idle* watcher, int revents) 279 { 280 auto eventLoop = (cast(IrcEventLoop)watcher.data); 281 282 while(!eventLoop.customMessages.empty) 283 { 284 auto cb = eventLoop.customMessages.front; 285 eventLoop.customMessages.popFront(); 286 cb(); 287 } 288 289 ev_idle_stop(ev, watcher); 290 } 291 292 /** 293 * Run the specified callback at the next idle event. 294 */ 295 void post(void delegate() callback) 296 { 297 customMessages ~= callback; 298 299 auto watcher = &idleWatcher; 300 301 if(!ev_is_active(watcher)) 302 { 303 watcher.data = cast(void*)this; 304 ev_idle_init(watcher, &onIdle); 305 ev_idle_start(ev, watcher); 306 } 307 } 308 309 private static struct CustomTimer 310 { 311 ev_timer timer; 312 void delegate() callback; 313 } 314 315 private extern(C) static void onCustomTimeout(ev_loop_t* ev, ev_timer* timer, int revents) 316 { 317 import core.memory : GC; 318 auto customTimer = cast(CustomTimer*)timer; 319 320 //scope(exit) dealloc(customTimer); 321 322 if(customTimer.timer.repeat == 0) 323 GC.removeRoot(timer); 324 325 customTimer.callback(); 326 } 327 328 enum TimerRepeat { yes, no } 329 330 struct Timer 331 { 332 private: 333 IrcEventLoop eventLoop; 334 CustomTimer* timer; 335 336 public: 337 void stop() 338 { 339 enforce(active); 340 ev_timer_stop(eventLoop.ev, &timer.timer); 341 timer = null; 342 } 343 344 bool active() @property 345 { 346 return timer !is null && ev_is_active(&timer.timer); 347 } 348 349 TimerRepeat repeat() @property 350 { 351 enforce(active); 352 with(TimerRepeat) return timer.timer.repeat == 0? no : yes; 353 } 354 355 bool opCast(T)() if(is(T == bool)) 356 { 357 return active; 358 } 359 } 360 361 /** 362 * Run the specified callback as soon as possible after $(D time) 363 * has elapsed. 364 * 365 * Equivalent to $(D postTimer(callback, time, TimerRepeat.no)). 366 */ 367 Timer post(void delegate() callback, double time) 368 { 369 return postTimer(callback, time, TimerRepeat.no); 370 } 371 372 /** 373 * Run $(D callback) at every $(D interval), or just once after $(D interval) 374 * time has elapsed if $(D repeat) is $(D TimerRepeat.no). 375 */ 376 Timer postTimer(void delegate() callback, double interval, TimerRepeat repeat) 377 { 378 import core.memory : GC; 379 380 enforce(callback); 381 enforce(interval >= 0); 382 383 //auto watcher = alloc!CustomTimer(); // TODO: use more efficient memory management 384 auto watcher = new CustomTimer(); 385 watcher.callback = callback; 386 387 double repeatTime = repeat == TimerRepeat.yes? interval : 0.0; 388 389 ev_timer_init(&watcher.timer, &onCustomTimeout, interval, repeatTime); 390 ev_timer_start(ev, &watcher.timer); 391 392 GC.addRoot(watcher); 393 394 return Timer(this, watcher); 395 } 396 397 /** 398 * Handle incoming data for the clients in the set. 399 * 400 * The incoming data is handled by the respective client, 401 * and callbacks are called. 402 * Returns when all clients are no longer connected, 403 * or immediately if there are no clients in the set. 404 */ 405 void run() 406 { 407 ev_run(ev, 0); 408 } 409 }