UInt WaitForAnyMonitor(UInt count, Monitor ** monitors)
{ struct WaitList * nodes;
Monitor * monitor;
UInt i; Int result;
assert(MonitorsAreSorted(count, monitors));
nodes = alloca(sizeof(struct WaitList) * count); for (i = 0; i < count; i++)
nodes[i].thread = GetTLS(); for (i = 0; i < count; i++)
AddWaitList(monitors[i], &nodes[i]); for (i = 0; i < count; i++)
UnlockMonitor(monitors[i]);
LockThread(GetTLS()); while (!TLS(acquiredMonitor))
WaitThreadSignal();
monitor = TLS(acquiredMonitor);
UnlockThread(GetTLS());
// The following loops will initialize <result>, but the compiler // cannot know this; to avoid warnings, we set result to an // initial nonsense value.
result = -1; for (i = 0; i < count; i++) {
LockMonitor(monitors[i]); if (monitors[i] == monitor) {
RemoveWaitList(monitors[i], &nodes[i]);
result = i; // keep it locked for further processing by caller
} else {
RemoveWaitList(monitors[i], &nodes[i]);
UnlockMonitor(monitors[i]);
}
}
LockThread(GetTLS());
TLS(acquiredMonitor) = NULL;
UnlockThread(GetTLS()); return result;
}
staticint GetThreadID(constchar * funcname, Obj thread)
{ if (IS_INTOBJ(thread)) { Int id = INT_INTOBJ(thread); if (0 <= id && id < MAX_THREADS) return id;
} elseif (TNUM_OBJ(thread) == T_THREAD) { return ThreadID(thread);
}
RequireArgumentEx(funcname, thread, NICE_ARGNAME(thread), "must be a thread object or an integer between 0 and " "MAX_THREADS - 1");
}
static Obj FuncCreateThread(Obj self, Obj funcargs)
{ Int i, n;
Obj thread;
Obj templist;
n = LEN_PLIST(funcargs); if (n == 0 || !IS_FUNC(ELM_PLIST(funcargs, 1))) return ArgumentError( "CreateThread: Needs at least one function argument");
Obj func = ELM_PLIST(funcargs, 1); if (NARG_FUNC(func) != n - 1)
ErrorMayQuit("CreateThread: <func> expects %d arguments, but got %d", NARG_FUNC(func), n-1);
templist = NEW_PLIST(T_PLIST, n);
SET_LEN_PLIST(templist, n);
SET_REGION(templist, NULL); // make it public for (i = 1; i <= n; i++)
SET_ELM_PLIST(templist, i, ELM_PLIST(funcargs, i));
thread = RunThread(ThreadedInterpreter, KeepAlive(templist)); if (!thread) return Fail; return thread;
}
static Obj FuncSetRegionName(Obj self, Obj obj, Obj name)
{
Region * region = GetRegionOf(obj); if (!region)
ArgumentError( "SetRegionName: Cannot change name of the public region");
RequireStringRep(SELF_NAME, name);
SetRegionName(region, name); return (Obj)0;
}
static Obj FuncClearRegionName(Obj self, Obj obj)
{
Region * region = GetRegionOf(obj); if (!region)
ArgumentError( "ClearRegionName: Cannot change name of the public region");
SetRegionName(region, (Obj)0); return (Obj)0;
}
static Obj FuncRegionName(Obj self, Obj obj)
{
Obj result;
Region * region = GetRegionOf(obj);
result = GetRegionName(region); if (!result)
result = Fail; return result;
}
staticvoid ExpandChannel(Channel * channel)
{ // Growth ratio should be less than the golden ratio const UInt oldCapacity = channel->capacity; const UInt newCapacity = ((oldCapacity * 25 / 16) | 1) + 1;
GAP_ASSERT(newCapacity > oldCapacity);
UInt i, tail;
Obj newqueue;
newqueue = NEW_PLIST(T_PLIST, newCapacity);
SET_LEN_PLIST(newqueue, newCapacity);
SET_REGION(newqueue, REGION(channel->queue));
channel->capacity = newCapacity; for (i = channel->head; i < oldCapacity; i++)
ADDR_OBJ(newqueue)[i + 1] = ADDR_OBJ(channel->queue)[i + 1]; for (i = 0; i < channel->tail; i++) {
UInt d = oldCapacity + i; if (d >= newCapacity)
d -= newCapacity;
ADDR_OBJ(newqueue)[d + 1] = ADDR_OBJ(channel->queue)[i + 1];
}
tail = channel->head + oldCapacity; if (tail >= newCapacity)
tail -= newCapacity;
channel->tail = tail;
channel->queue = newqueue;
}
staticvoid AddToChannel(Channel * channel, Obj obj, int migrate)
{
Obj children;
Region * region = REGION(channel->queue);
UInt i, len; if (migrate && IS_BAG_REF(obj) && REGION(obj) &&
REGION(obj)->owner == GetTLS() && REGION(obj)->fixed_owner) {
children = ReachableObjectsFrom(obj);
len = children ? LEN_PLIST(children) : 0;
} else {
children = 0;
len = 0;
} for (i = 1; i <= len; i++) {
Obj item = ELM_PLIST(children, i);
SET_REGION(item, region);
}
ADDR_OBJ(channel->queue)[++channel->tail] = obj;
ADDR_OBJ(channel->queue)[++channel->tail] = children; if (channel->tail == channel->capacity)
channel->tail = 0;
channel->size += 2;
}
static Obj RetrieveFromChannel(Channel * channel)
{
Obj obj = ADDR_OBJ(channel->queue)[++channel->head];
Obj children = ADDR_OBJ(channel->queue)[++channel->head];
Region * region = TLS(currentRegion);
UInt i, len = children ? LEN_PLIST(children) : 0;
ADDR_OBJ(channel->queue)[channel->head - 1] = 0;
ADDR_OBJ(channel->queue)[channel->head] = 0; if (channel->head == channel->capacity)
channel->head = 0; for (i = 1; i <= len; i++) {
Obj item = ELM_PLIST(children, i);
SET_REGION(item, region);
}
channel->size -= 2; return obj;
}
staticInt TallyChannel(Channel * channel)
{ Int result;
LockChannel(channel);
result = channel->size / 2;
UnlockChannel(channel); return result;
}
staticvoid SendChannel(Channel * channel, Obj obj, int migrate)
{
LockChannel(channel); if (channel->size == channel->capacity && channel->dynamic)
ExpandChannel(channel); while (channel->size == channel->capacity)
WaitChannel(channel);
AddToChannel(channel, obj, migrate);
SignalChannel(channel);
UnlockChannel(channel);
}
staticvoid MultiSendChannel(Channel * channel, Obj list, int migrate)
{ int listsize = LEN_LIST(list); int i;
Obj obj;
LockChannel(channel); for (i = 1; i <= listsize; i++) { if (channel->size == channel->capacity && channel->dynamic)
ExpandChannel(channel); while (channel->size == channel->capacity)
WaitChannel(channel);
obj = ELM_LIST(list, i);
AddToChannel(channel, obj, migrate);
}
SignalChannel(channel);
UnlockChannel(channel);
}
staticint TryMultiSendChannel(Channel * channel, Obj list, int migrate)
{ int result = 0; int listsize = LEN_LIST(list); int i;
Obj obj;
LockChannel(channel); for (i = 1; i <= listsize; i++) { if (channel->size == channel->capacity && channel->dynamic)
ExpandChannel(channel); if (channel->size == channel->capacity) break;
obj = ELM_LIST(list, i);
AddToChannel(channel, obj, migrate);
result++;
}
SignalChannel(channel);
UnlockChannel(channel); return result;
}
static Obj ReceiveAnyChannel(Obj channelList, int with_index)
{
UInt count = LEN_PLIST(channelList);
UInt i, p;
Monitor ** monitors = alloca(count * sizeof(Monitor *));
Channel ** channels = alloca(count * sizeof(Channel *));
Obj result;
Channel * channel; for (i = 0; i < count; i++)
channels[i] = ObjPtr(ELM_PLIST(channelList, i + 1));
SortChannels(count, channels); for (i = 0; i < count; i++)
monitors[i] = ObjPtr(channels[i]->monitor);
LockMonitors(count, monitors);
p = TLS(multiplexRandomSeed);
p = (p * 5 + 1);
TLS(multiplexRandomSeed) = p;
p %= count; for (i = 0; i < count; i++) {
channel = channels[p]; if (channel->size > 0) break;
p++; if (p >= count)
p = 0;
} if (i < count) // found a channel with data
{
p = i; for (i = 0; i < count; i++) if (i != p)
UnlockMonitor(monitors[i]);
} else// all channels are empty for (;;) { for (i = 0; i < count; i++)
channels[i]->waiting++;
p = WaitForAnyMonitor(count, monitors); for (i = 0; i < count; i++)
channels[i]->waiting--;
channel = channels[p]; if (channel->size > 0) break;
UnlockMonitor(monitors[p]);
LockMonitors(count, monitors);
}
result = RetrieveFromChannel(channel);
SignalChannel(channel);
UnlockMonitor(monitors[p]); if (with_index) {
Obj list = NEW_PLIST(T_PLIST, 2);
SET_LEN_PLIST(list, 2);
SET_ELM_PLIST(list, 1, result); for (i = 1; i <= count; i++) if (ObjPtr(ELM_PLIST(channelList, i)) == channel) {
SET_ELM_PLIST(list, 2, INTOBJ_INT(i)); break;
} return list;
} else return result;
}
static Obj MultiReceiveChannel(Channel * channel, UInt max)
{
Obj result;
UInt count;
UInt i;
LockChannel(channel); if (max > channel->size / 2)
count = channel->size / 2; else
count = max;
result = NEW_PLIST(T_PLIST, count);
SET_LEN_PLIST(result, count); for (i = 0; i < count; i++) {
Obj item = RetrieveFromChannel(channel);
SET_ELM_PLIST(result, i + 1, item);
}
SignalChannel(channel);
UnlockChannel(channel); return result;
}
static Obj InspectChannel(Channel * channel)
{
LockChannel(channel); const UInt count = channel->size / 2;
Obj result = NEW_PLIST(T_PLIST, count);
SET_LEN_PLIST(result, count); for (UInt i = 0, p = channel->head; i < count; i++) {
SET_ELM_PLIST(result, i + 1, ELM_PLIST(channel->queue, p + 1));
p += 2; if (p == channel->capacity)
p = 0;
}
UnlockChannel(channel); return result;
}
static Obj FuncCreateChannel(Obj self, Obj args)
{ int capacity; switch (LEN_PLIST(args)) { case0:
capacity = -1; break; case1: if (IS_INTOBJ(ELM_PLIST(args, 1))) {
capacity = INT_INTOBJ(ELM_PLIST(args, 1)); if (capacity <= 0) return ArgumentError( "CreateChannel: Capacity must be positive"); break;
} return ArgumentError( "CreateChannel: Argument must be capacity of the channel"); default: return ArgumentError( "CreateChannel: Function takes up to one argument");
} return CreateChannel(capacity);
}
staticBOOL IsChannelList(Obj list)
{ int len = LEN_PLIST(list); int i; if (len == 0) return0; for (i = 1; i <= len; i++) if (!IsChannel(ELM_PLIST(list, i))) return0; return1;
}
static Obj FuncReceiveAnyChannel(Obj self, Obj args)
{ if (IsChannelList(args)) return ReceiveAnyChannel(args, 0); else { if (LEN_PLIST(args) == 1 && IS_PLIST(ELM_PLIST(args, 1)) &&
IsChannelList(ELM_PLIST(args, 1))) return ReceiveAnyChannel(ELM_PLIST(args, 1), 0); else return ArgumentError( "ReceiveAnyChannel: Argument list must be channels");
}
}
static Obj FuncReceiveAnyChannelWithIndex(Obj self, Obj args)
{ if (IsChannelList(args)) return ReceiveAnyChannel(args, 1); else { if (LEN_PLIST(args) == 1 && IS_PLIST(ELM_PLIST(args, 1)) &&
IsChannelList(ELM_PLIST(args, 1))) return ReceiveAnyChannel(ELM_PLIST(args, 1), 1); else return ArgumentError( "ReceiveAnyChannelWithIndex: Argument list must be channels");
}
}
static Obj FuncCURRENT_LOCKS(Obj self)
{
UInt i, len = TLS(lockStackPointer);
Obj result = NEW_PLIST(T_PLIST, len);
SET_LEN_PLIST(result, len); for (i = 1; i <= len; i++)
SET_ELM_PLIST(result, i, ELM_PLIST(TLS(lockStack), i)); return result;
}
static Obj FuncNEW_REGION(Obj self, Obj name, Obj prec)
{ if (name != Fail && !IsStringConv(name))
RequireArgument(SELF_NAME, name, "must be a string or fail"); Int p = GetSmallInt(SELF_NAME, prec);
Region * region = NewRegion();
region->prec = p; if (name != Fail)
SetRegionName(region, name); return region->obj;
}
static Obj FuncREGION_PRECEDENCE(Obj self, Obj regobj)
{
Region * region = GetRegionOf(regobj); return region == NULL ? INTOBJ_INT(0) : INTOBJ_INT(region->prec);
}
staticint AutoRetyping = 0;
staticint
MigrateObjects(int count, Obj * objects, Region * target, int retype)
{ int i; if (count && retype && IS_BAG_REF(objects[0]) &&
REGION(objects[0])->owner == GetTLS() && AutoRetyping) { for (i = 0; i < count; i++) if (REGION(objects[i])->owner == GetTLS())
CLEAR_OBJ_FLAG(objects[i], OBJ_FLAG_TESTED); for (i = 0; i < count; i++) { if (REGION(objects[i])->owner == GetTLS() &&
IS_PLIST(objects[i])) { if (!TEST_OBJ_FLAG(objects[i], OBJ_FLAG_TESTED))
TYPE_OBJ(objects[i]); if (retype >= 2)
IS_SSORT_LIST(objects[i]); // record if the list a set in the tnum
}
}
} for (i = 0; i < count; i++) {
Region * region; if (IS_BAG_REF(objects[i])) {
region = REGION(objects[i]); if (!region || region->owner != GetTLS()) return0;
}
} // If we are migrating records to a region where they become immutable, // they need to be sorted, as sorting upon access may prove impossible. for (i = 0; i < count; i++) {
Obj obj = objects[i]; if (TNUM_OBJ(obj) == T_PREC) {
SortPRecRNam(obj);
}
SET_REGION(obj, target);
} return1;
}
static Obj FuncSHARE(Obj self, Obj obj, Obj name, Obj prec)
{ if (name != Fail && !IsStringConv(name))
RequireArgument(SELF_NAME, name, "must be a string or fail"); Int p = GetSmallInt(SELF_NAME, prec);
Region * region = NewRegion();
region->prec = p;
Obj reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1, region, 1)) return ArgumentError( "SHARE: Thread does not have exclusive access to objects"); if (name != Fail)
SetRegionName(region, name); return obj;
}
static Obj FuncSHARE_RAW(Obj self, Obj obj, Obj name, Obj prec)
{ if (name != Fail && !IsStringConv(name))
RequireArgument(SELF_NAME, name, "must be a string or fail"); Int p = GetSmallInt(SELF_NAME, prec);
Region * region = NewRegion();
region->prec = p;
Obj reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1, region, 0)) return ArgumentError( "SHARE_RAW: Thread does not have exclusive access to objects"); if (name != Fail)
SetRegionName(region, name); return obj;
}
static Obj FuncSHARE_NORECURSE(Obj self, Obj obj, Obj name, Obj prec)
{ if (name != Fail && !IsStringConv(name))
RequireArgument(SELF_NAME, name, "must be a string or fail"); Int p = GetSmallInt(SELF_NAME, prec);
Region * region = NewRegion();
region->prec = p; if (!MigrateObjects(1, &obj, region, 0)) return ArgumentError("SHARE_NORECURSE: Thread does not have " "exclusive access to objects"); if (name != Fail)
SetRegionName(region, name); return obj;
}
static Obj FuncADOPT(Obj self, Obj obj)
{
Obj reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1,
TLS(threadRegion), 0)) return ArgumentError( "ADOPT: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncADOPT_NORECURSE(Obj self, Obj obj)
{ if (!MigrateObjects(1, &obj, TLS(threadRegion), 0)) return ArgumentError("ADOPT_NORECURSE: Thread does not have " "exclusive access to objects"); return obj;
}
static Obj FuncMIGRATE(Obj self, Obj obj, Obj target)
{
Region * target_region = GetRegionOf(target);
Obj reachable; if (!target_region ||
IsLocked(target_region) != LOCK_STATUS_READWRITE_LOCKED) return ArgumentError("MIGRATE: Thread does not have exclusive access " "to target region");
reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1,
target_region, 1)) return ArgumentError( "MIGRATE: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncMIGRATE_RAW(Obj self, Obj obj, Obj target)
{
Region * target_region = GetRegionOf(target);
Obj reachable; if (!target_region ||
IsLocked(target_region) != LOCK_STATUS_READWRITE_LOCKED) return ArgumentError("MIGRATE_RAW: Thread does not have exclusive access " "to target region");
reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1,
target_region, 0)) return ArgumentError( "MIGRATE_RAW: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncMIGRATE_NORECURSE(Obj self, Obj obj, Obj target)
{
Region * target_region = GetRegionOf(target); if (!target_region ||
IsLocked(target_region) != LOCK_STATUS_READWRITE_LOCKED) return ArgumentError("MIGRATE_NORECURSE: Thread does not have " "exclusive access to target region"); if (!MigrateObjects(1, &obj, target_region, 0)) return ArgumentError("MIGRATE_NORECURSE: Thread does not have " "exclusive access to object"); return obj;
}
static Obj FuncMAKE_PUBLIC(Obj self, Obj obj)
{
Obj reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1, NULL, 0)) return ArgumentError( "MAKE_PUBLIC: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncMAKE_PUBLIC_NORECURSE(Obj self, Obj obj)
{ if (!MigrateObjects(1, &obj, NULL, 0)) return ArgumentError("MAKE_PUBLIC_NORECURSE: Thread does not have " "exclusive access to objects"); return obj;
}
static Obj FuncFORCE_MAKE_PUBLIC(Obj self, Obj obj)
{ if (!IS_BAG_REF(obj)) return ArgumentError("FORCE_MAKE_PUBLIC: Argument is a small integer " "or finite-field element");
MakeBagPublic(obj); return obj;
}
static Obj FuncMakeThreadLocal(Obj self, Obj var)
{ constchar * name;
UInt gvar; if (!IsStringConv(var) || GET_LEN_STRING(var) == 0)
RequireArgument(SELF_NAME, var, "must be a variable name");
name = CONST_CSTR_STRING(var);
gvar = GVarName(name);
name = CONST_CSTR_STRING(NameGVar(gvar)); // to apply namespace scopes where needed
MakeThreadLocalVar(gvar, RNamName(name)); return (Obj)0;
}
static Obj FuncMakeReadOnlyObj(Obj self, Obj obj)
{
Region * region = GetRegionOf(obj);
Obj reachable; if (!region || region == ReadOnlyRegion) return obj;
reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1,
ReadOnlyRegion, 1)) return ArgumentError( "MakeReadOnlyObj: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncMakeReadOnlyRaw(Obj self, Obj obj)
{
Region * region = GetRegionOf(obj);
Obj reachable; if (!region || region == ReadOnlyRegion) return obj;
reachable = ReachableObjectsFrom(obj); if (!MigrateObjects(LEN_PLIST(reachable), ADDR_OBJ(reachable) + 1,
ReadOnlyRegion, 0)) return ArgumentError( "MakeReadOnlyObj: Thread does not have exclusive access to objects"); return obj;
}
static Obj FuncMakeReadOnlySingleObj(Obj self, Obj obj)
{
Region * region = GetRegionOf(obj); if (!region || region == ReadOnlyRegion) return obj; if (!MigrateObjects(1, &obj, ReadOnlyRegion, 0)) return ArgumentError("MakeReadOnlySingleObj: Thread does not have " "exclusive access to object"); return obj;
}
/**************************************************************************** ** *FInitKernel(<module>)........initialisekerneldatastructures
*/ staticInt InitKernel(StructInitInfo * module)
{ // set the bag type names (for error messages and debugging)
InitBagNamesFromTable(BagNames);
// make bag types public
MakeBagTypePublic(T_THREAD);
MakeBagTypePublic(T_REGION);
MakeBagTypePublic(T_SEMAPHORE);
MakeBagTypePublic(T_CHANNEL);
MakeBagTypePublic(T_SYNCVAR);
MakeBagTypePublic(T_BARRIER);
/**************************************************************************** ** *FInitInfoThreadAPI()...............tableofinitfunctions
*/ static StructInitInfo module = { // init struct using C99 designated initializers; for a full list of // fields, please refer to the definition of StructInitInfo
.type = MODULE_BUILTIN,
.name = "threadapi",
.initKernel = InitKernel,
.initLibrary = InitLibrary,
};
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.