Merge pull request #1393 from CyberShadow/std-socket-dynamic-socketset

std.socket: Remove size restriction from SocketSet
This commit is contained in:
Dmitry Olshansky 2014-01-27 00:49:05 -08:00
commit 3f521ca8eb

View file

@ -2023,109 +2023,166 @@ struct TimeVal
/**
* A collection of sockets for use with $(D Socket.select).
*
* $(D SocketSet) allows specifying the capacity of the underlying
* $(D fd_set), however users should be aware that the exact meaning of this
* value varies depending on the current platform:
* $(UL $(LI On POSIX, $(D fd_set) is a bit array of file descriptors. The
* $(D SocketSet) capacity specifies the highest file descriptor which can be
* stored in the set.)
* $(LI on Windows, $(D fd_set) is an array of socket handles. Capacity
* indicates the actual number of sockets that can be stored in the set.))
* $(D SocketSet) wraps the platform $(D fd_set) type. However, unlike
* $(D fd_set), $(D SocketSet) is not statically limited to $(D FD_SETSIZE)
* or any other limit, and grows as needed.
*/
class SocketSet
{
private:
version(Windows)
version (Windows)
{
// the maximum number of sockets the allocated fd_set can hold
uint fdsetCapacity;
// On Windows, fd_set is an array of socket handles,
// following a word containing the fd_set instance size.
// We use one dynamic array for everything, and use its first
// element(s) for the count.
fd_set* set;
@property uint count() const { return set.fd_count; }
alias fd_set_count_type = typeof(fd_set.init.fd_count);
alias fd_set_type = typeof(fd_set.init.fd_array[0]);
static assert(fd_set_type.sizeof == socket_t.sizeof);
// Number of fd_set_type elements at the start of our array that are
// used for the socket count and alignment
enum FD_SET_OFFSET = fd_set.fd_array.offsetof / fd_set_type.sizeof;
static assert(FD_SET_OFFSET);
static assert(fd_set.fd_count.offsetof % fd_set_type.sizeof == 0);
fd_set_type[] set;
final void resize(size_t size)
{
set.length = FD_SET_OFFSET + size;
}
final ref fd_set_count_type count() @property inout
{
assert(set.length);
return *cast(fd_set_count_type*)set.ptr;
}
final size_t capacity() @property const
{
return set.length - FD_SET_OFFSET;
}
final inout socket_t[] fds() inout @property
{
return cast(socket_t[])set[FD_SET_OFFSET..FD_SET_OFFSET+count];
}
}
else version(Posix)
else
version (Posix)
{
int fdsetMax;
// On Posix, fd_set is a bit array. We assume that the fd_set
// type (declared in core.sys.posix.sys.select) is a structure
// containing a single field, a static array.
static assert(fd_set.tupleof.length==1);
// This is the type used in the fd_set array.
// Using the type of the correct size is important for big-endian
// architectures.
alias fd_set_type = typeof(fd_set.init.tupleof[0][0]);
// Number of file descriptors represented by one fd_set_type
enum FD_NFDBITS = 8 * fd_set_type.sizeof;
static fd_set_type mask(uint n)
{
return (cast(fd_set_type)1) << (n % FD_NFDBITS);
}
// Array size to fit that many sockets
static size_t lengthFor(size_t size)
{
return (size + (FD_NFDBITS-1)) / FD_NFDBITS;
}
fd_set_type[] set;
final void resize(size_t size)
{
set.length = lengthFor(size);
}
// Make sure we can fit that many sockets
final void setMinCapacity(size_t size)
{
auto length = lengthFor(size);
if (set.length < length)
set.length = length;
}
final size_t capacity() @property const
{
return set.length / FD_NFDBITS;
}
fd_set setData;
final @property fd_set* set() { return &setData; }
final @property const(fd_set)* set() const { return &setData; }
int maxfd;
uint count;
}
else
static assert(false, "Unknown platform");
public:
/**
* Set the capacity of this $(D SocketSet). The exact meaning of the
* $(D max) parameter varies from platform to platform.
* Throws: $(D SocketParameterException) if $(D max) exceeds this
* platform's maximum socket set size.
* Create a SocketSet with a specific initial capacity (defaults to
* $(D FD_SETSIZE), the system's default capacity).
*/
this(uint max)
this(size_t size = FD_SETSIZE)
{
version(Windows)
{
fdsetCapacity = max;
set = FD_CREATE(max);
}
else version(Posix)
{
// TODO (needs druntime changes)
enforce(max <= FD_SETSIZE, new SocketParameterException(
"Maximum socket set size exceeded for this platform"));
fdsetMax = max;
}
resize(size);
reset();
}
/// Uses the default capacity for the system.
this()
{
this(FD_SETSIZE);
}
/// Reset the $(D SocketSet) so that there are 0 $(D Socket)s in the collection.
void reset()
{
FD_ZERO(set);
version(Posix)
{
maxfd = -1;
version (Windows)
count = 0;
else
{
set[] = 0;
maxfd = -1;
}
}
void add(socket_t s)
{
// Make sure too many sockets don't get added.
version(Windows)
version (Windows)
{
enforce(count < fdsetCapacity, new SocketParameterException(
"SocketSet capacity exceeded"));
if (count == capacity)
{
set.length *= 2;
set.length = set.capacity;
}
fds[count++] = s;
}
else version(Posix)
else
{
enforce(s < fdsetMax, new SocketParameterException(
"Socket descriptor index exceeds SocketSet capacity"));
}
FD_SET(s, set);
version(Posix)
{
++count;
if(s > maxfd)
auto index = s / FD_NFDBITS;
auto length = set.length;
if (index >= length)
{
while (length < index)
length *= 2;
set.length = length;
set.length = set.capacity;
}
set[index] |= mask(s);
if (maxfd < s)
maxfd = s;
}
}
/// Add a $(D Socket) to the collection.
/// Throws: $(D SocketParameterException) if the capacity of this
/// $(D SocketSet) has been exceeded.
/// The socket must not already be in the collection.
void add(Socket s)
{
add(s.sock);
@ -2133,22 +2190,27 @@ public:
void remove(socket_t s)
{
version(Posix)
version (Windows)
{
enforce(s < fdsetMax, new SocketParameterException(
"Socket descriptor index exceeds SocketSet capacity"));
import std.algorithm : countUntil;
auto fds = fds;
auto p = fds.countUntil(s);
if (p >= 0)
fds[p] = fds[--count];
}
FD_CLR(s, set);
version(Posix)
else
{
--count;
auto index = s / FD_NFDBITS;
if (index >= set.length)
return;
set[index] &= ~mask(s);
// note: adjusting maxfd would require scanning the set, not worth it
}
}
/// Remove this $(D Socket) from the collection.
/// Does nothing if the socket is not in the collection already.
void remove(Socket s)
{
remove(s.sock);
@ -2156,57 +2218,153 @@ public:
int isSet(socket_t s) const
{
version(Posix)
version (Windows)
{
enforce(s < fdsetMax, new SocketParameterException(
"Socket descriptor index exceeds SocketSet capacity"));
import std.algorithm;
return fds.canFind(s) ? 1 : 0;
}
else
{
if (s > maxfd)
return 0;
auto index = s / FD_NFDBITS;
return (set[index] & mask(s)) ? 1 : 0;
}
return FD_ISSET(s, set);
}
/// Returns nonzero if this $(D Socket) is in the collection.
/// Return nonzero if this $(D Socket) is in the collection.
int isSet(Socket s) const
{
return isSet(s.sock);
}
/// Return the capacity of this $(D SocketSet). The exact meaning of the
/// return value varies from platform to platform.
/// Return the current capacity of this $(D SocketSet). The exact
/// meaning of the return value varies from platform to platform.
/// Note that since D 2.065, this value does not indicate a
/// restriction, and $(D SocketSet) will grow its capacity as
/// needed automatically.
@property uint max() const
{
version(Windows)
{
return fdsetCapacity;
}
else version(Posix)
{
return fdsetMax;
}
return cast(uint)capacity;
}
fd_set* toFd_set()
{
return set;
return cast(fd_set*)set.ptr;
}
int selectn() const
{
version(Windows)
version (Windows)
{
return count;
}
else version(Posix)
else version (Posix)
{
return maxfd + 1;
}
}
}
unittest
{
auto fds = cast(socket_t[])
[cast(socket_t)1, 2, 0, 1024, 17, 42, 1234, 77, 77+32, 77+64];
auto set = new SocketSet();
foreach (fd; fds) assert(!set.isSet(fd));
foreach (fd; fds) set.add(fd);
foreach (fd; fds) assert(set.isSet(fd));
// Make sure SocketSet reimplements fd_set correctly
auto fdset = set.toFd_set();
foreach (fd; fds[0]..cast(socket_t)(fds[$-1]+1))
assert(cast(bool)set.isSet(fd) == cast(bool)FD_ISSET(fd, fdset));
foreach (fd; fds)
{
assert(set.isSet(fd));
set.remove(fd);
assert(!set.isSet(fd));
}
}
unittest
{
softUnittest({
enum PAIRS = 768;
version(Posix)
{
enum LIMIT = 2048;
static assert(LIMIT > PAIRS*2);
import core.sys.posix.sys.resource;
rlimit fileLimit;
getrlimit(RLIMIT_NOFILE, &fileLimit);
assert(fileLimit.rlim_max > LIMIT, "Open file hard limit too low");
fileLimit.rlim_cur = LIMIT;
setrlimit(RLIMIT_NOFILE, &fileLimit);
}
Socket[2][PAIRS] pairs;
foreach (ref pair; pairs)
pair = socketPair();
scope(exit)
{
foreach (pair; pairs)
{
pair[0].close();
pair[1].close();
}
}
import std.random;
auto rng = Xorshift(42);
pairs[].randomShuffle(rng);
auto readSet = new SocketSet();
auto writeSet = new SocketSet();
auto errorSet = new SocketSet();
foreach (testPair; pairs)
{
void fillSets()
{
readSet.reset();
writeSet.reset();
errorSet.reset();
foreach (ref pair; pairs)
foreach (s; pair[])
{
readSet.add(s);
writeSet.add(s);
errorSet.add(s);
}
}
fillSets();
auto n = Socket.select(readSet, writeSet, errorSet);
assert(n == PAIRS*2); // All in writeSet
assert(writeSet.isSet(testPair[0]));
assert(writeSet.isSet(testPair[1]));
assert(!readSet.isSet(testPair[0]));
assert(!readSet.isSet(testPair[1]));
assert(!errorSet.isSet(testPair[0]));
assert(!errorSet.isSet(testPair[1]));
ubyte[1] b;
testPair[0].send(b[]);
fillSets();
n = Socket.select(readSet, null, null);
assert(n == 1); // testPair[1]
assert(readSet.isSet(testPair[1]));
assert(!readSet.isSet(testPair[0]));
testPair[1].receive(b[]);
}
});
}
/// The level at which a socket option is defined:
enum SocketOptionLevel: int
@ -3099,6 +3257,12 @@ public:
{
fe = null;
}
// Make sure the sets' capacity matches, to avoid select reading
// out of bounds just because one set was bigger than another
if (checkRead ) checkRead .setMinCapacity(n);
if (checkWrite) checkWrite.setMinCapacity(n);
if (checkError) checkError.setMinCapacity(n);
}
int result = .select(n, fr, fw, fe, &timeout.ctimeval);