1 module irc.linebuffer; 2 3 import irc.exception; 4 import irc.protocol; 5 6 import std.exception; 7 import std.socket; 8 import core.stdc..string : memmove; 9 10 debug(Dirk) static import std.stdio; 11 12 struct IncomingLineBuffer 13 { 14 private: 15 char[] buffer; 16 size_t lineStart, bufferPos; 17 void delegate(in char[] line) onReceivedLine; 18 19 public: 20 this(char[] buffer, void delegate(in char[] line) onReceivedLine) 21 { 22 this.buffer = buffer; 23 this.onReceivedLine = onReceivedLine; 24 } 25 26 /// End of the current line. 27 size_t position() @property 28 { 29 return bufferPos; 30 } 31 32 /// Notify that n number of bytes have been committed after the current position. 33 /// Call with $(D n = 0) to invoke the callback for any lines that were skipped 34 /// due to an exception being thrown during a previous commit. 35 void commit(size_t n) 36 { 37 auto nextBufferPos = bufferPos + n; 38 39 if(nextBufferPos == buffer.length) 40 { 41 bufferPos = nextBufferPos; 42 nextBufferPos = moveDown(); 43 } 44 45 foreach(i; bufferPos .. nextBufferPos) 46 { 47 if(buffer[i] == '\n') 48 { 49 auto line = buffer[lineStart .. i]; 50 51 if(line.length > 0 && line[$ - 1] == '\r') 52 --line.length; // Skip \r 53 54 lineStart = i + 1; // Skip \n 55 56 // If onReceivedLine throws, we want to just skip 57 // the the current line, leaving the next lines 58 // to be parsed on the next commit. 59 bufferPos = lineStart; 60 61 onReceivedLine(line); 62 } 63 } 64 65 bufferPos = nextBufferPos; 66 } 67 68 private: 69 size_t moveDown() 70 { 71 enforceEx!IrcParseErrorException(lineStart != 0, "line too long for buffer"); 72 73 auto length = bufferPos - lineStart; 74 memmove(buffer.ptr, buffer.ptr + lineStart, length); 75 lineStart = 0; 76 bufferPos = 0; 77 78 return length; 79 } 80 } 81 82 struct OutgoingLineBuffer 83 { 84 private: 85 Socket socket; 86 87 version(unittest) 88 char["PRIVMSG #test :0123456789ABCDEF\r\n".length] lineBuffer; 89 else 90 char[IRC_MAX_LEN - MAX_USERHOST_LEN] lineBuffer = void; 91 92 char[] _messageBuffer, bufferTail; 93 94 public: 95 @disable this(); 96 @disable this(this); 97 98 this(Socket socket, in char[] command, in char[] target) 99 { 100 this.socket = socket; 101 lineBuffer[0 .. command.length] = command; 102 immutable targetStart = command.length + 1; 103 lineBuffer[command.length .. targetStart] = ' '; 104 lineBuffer[targetStart .. targetStart + target.length] = target; 105 immutable messageStart = targetStart + target.length + 2; 106 lineBuffer[targetStart + target.length .. messageStart] = " :"; 107 this._messageBuffer = lineBuffer[messageStart .. $ - 2]; 108 this.bufferTail = _messageBuffer; 109 } 110 111 size_t capacity() @property 112 { 113 return bufferTail.length; 114 } 115 116 bool hasMessage() @property 117 { 118 return bufferTail.length != _messageBuffer.length; 119 } 120 121 char[] messageBuffer() @property 122 { 123 return this._messageBuffer; 124 } 125 126 void commit(size_t i) 127 { 128 bufferTail = bufferTail[i .. $]; 129 } 130 131 void consume(ref const(char)[] source, size_t n) 132 { 133 bufferTail[0 .. n] = source[0 .. n]; 134 bufferTail = bufferTail[n .. $]; 135 source = source[n .. $]; 136 } 137 138 void flush() 139 { 140 immutable fullLength = lineBuffer.length - bufferTail.length; 141 immutable sansNewlineLength = fullLength - 2; 142 lineBuffer[sansNewlineLength .. fullLength] = "\r\n"; 143 debug(Dirk) std.stdio.writefln(`<< "%s" (length: %s)`, lineBuffer[0 .. sansNewlineLength], sansNewlineLength); 144 socket.send(lineBuffer[0 .. fullLength]); 145 bufferTail = _messageBuffer; 146 } 147 } 148