1 module irc.linebuffer;
2 
3 import irc.exception;
4 import irc.protocol;
5 
6 import std.exception;
7 import std.socket;
8 import core.stdc..string : memmove;
9 
10 debug(Dirk) static import std.stdio;
11 
12 struct IncomingLineBuffer
13 {
14 	private:
15 	char[] buffer;
16 	size_t lineStart, bufferPos;
17 	void delegate(in char[] line) onReceivedLine;
18 
19 	public:
20 	this(char[] buffer, void delegate(in char[] line) onReceivedLine)
21 	{
22 		this.buffer = buffer;
23 		this.onReceivedLine = onReceivedLine;
24 	}
25 
26 	/// End of the current line.
27 	size_t position() @property
28 	{
29 		return bufferPos;
30 	}
31 
32 	/// Notify that n number of bytes have been committed after the current position.
33 	/// Call with $(D n = 0) to invoke the callback for any lines that were skipped
34 	/// due to an exception being thrown during a previous commit.
35 	void commit(size_t n)
36 	{
37 		auto nextBufferPos = bufferPos + n;
38 
39 		if(nextBufferPos == buffer.length)
40 		{
41 			bufferPos = nextBufferPos;
42 			nextBufferPos = moveDown();
43 		}
44 
45 		foreach(i; bufferPos .. nextBufferPos)
46 		{
47 			if(buffer[i] == '\n')
48 			{
49 				auto line = buffer[lineStart .. i];
50 
51 				if(line.length > 0 && line[$ - 1] == '\r')
52 					--line.length; // Skip \r
53 
54 				lineStart = i + 1; // Skip \n
55 
56 				// If onReceivedLine throws, we want to just skip
57 				// the the current line, leaving the next lines
58 				// to be parsed on the next commit.
59 				bufferPos = lineStart;
60 
61 				onReceivedLine(line);
62 			}
63 		}
64 
65 		bufferPos = nextBufferPos;
66 	}
67 
68 	private:
69 	size_t moveDown()
70 	{
71 		enforceEx!IrcParseErrorException(lineStart != 0, "line too long for buffer");
72 
73 		auto length = bufferPos - lineStart;
74 		memmove(buffer.ptr, buffer.ptr + lineStart, length);
75 		lineStart = 0;
76 		bufferPos = 0;
77 
78 		return length;
79 	}
80 }
81 
82 struct OutgoingLineBuffer
83 {
84 	private:
85 	Socket socket;
86 
87 	version(unittest)
88 		char["PRIVMSG #test :0123456789ABCDEF\r\n".length] lineBuffer;
89 	else
90 		char[IRC_MAX_LEN - MAX_USERHOST_LEN] lineBuffer = void;
91 
92 	char[] _messageBuffer, bufferTail;
93 
94 	public:
95 	@disable this();
96 	@disable this(this);
97 
98 	this(Socket socket, in char[] command, in char[] target)
99 	{
100 		this.socket = socket;
101 		lineBuffer[0 .. command.length] = command;
102 		immutable targetStart = command.length + 1;
103 		lineBuffer[command.length .. targetStart] = ' ';
104 		lineBuffer[targetStart .. targetStart + target.length] = target;
105 		immutable messageStart = targetStart + target.length + 2;
106 		lineBuffer[targetStart + target.length .. messageStart] = " :";
107 		this._messageBuffer = lineBuffer[messageStart .. $ - 2];
108 		this.bufferTail = _messageBuffer;
109 	}
110 
111 	size_t capacity() @property
112 	{
113 		return bufferTail.length;
114 	}
115 
116 	bool hasMessage() @property
117 	{
118 		return bufferTail.length != _messageBuffer.length;
119 	}
120 
121 	char[] messageBuffer() @property
122 	{
123 		return this._messageBuffer;
124 	}
125 
126 	void commit(size_t i)
127 	{
128 		bufferTail = bufferTail[i .. $];
129 	}
130 
131 	void consume(ref const(char)[] source, size_t n)
132 	{
133 		bufferTail[0 .. n] = source[0 .. n];
134 		bufferTail = bufferTail[n .. $];
135 		source = source[n .. $];
136 	}
137 
138 	void flush()
139 	{
140 		immutable fullLength = lineBuffer.length - bufferTail.length;
141 		immutable sansNewlineLength = fullLength - 2;
142 		lineBuffer[sansNewlineLength .. fullLength] = "\r\n";
143 		debug(Dirk) std.stdio.writefln(`<< "%s" (length: %s)`, lineBuffer[0 .. sansNewlineLength], sansNewlineLength);
144 		socket.send(lineBuffer[0 .. fullLength]);
145 		bufferTail = _messageBuffer;
146 	}
147 }
148