This application consists of two types of node: leaders, who make requests, and slaves, who service requests. Initially the leader will broadcast a request asking for any free slave. Once a slave replies to this request, the leader collects enough sensor data to fill an array of size 10 and transmits it to the slave that acknowledged it. The slave performs a Fourier transform on the received data, calculates the maximum value and returns this to the leader.
/**
* Basic sensor net grid application
* Leader broadcasts a request for slaves, a slave replies with its ID.
* Leader collects sensor data and unicasts to the slave
* Slave does fast fourier transform on data, finds max and returns this to leader
* Leader gets work and starts again
*
* No attempt to recover missing/lost packets, if this happends there is a time out and try again
*/
//struct used for radio transmission of request and work
type workPacket is struct (unsigned id ; unsigned workNum ; integer[] work)
type replyPacket is struct ( unsigned id ; unsigned workNum ; integer answer )
type ackPacket is struct (unsigned id ; bool state)
type requestPacket is struct (unsigned id)
/*------------------------------------------------------------------*/
proc createReplyPacket (unsigned id ; unsigned dest ; unsigned workNum ; integer ans) : RadioPacket {
r = new replyPacket(id, workNum, ans)
p = new RadioPacket(dest , any(r))
return p
}
/*------------------------------------------------------------------*/
proc createWorkPacket (unsigned dest ; unsigned id ; unsigned workNum ; integer[] work) : RadioPacket {
r = new workPacket(id, workNum, work)
p = new RadioPacket(dest, any(r))
return p
}
/*------------------------------------------------------------------*/
proc createRequestPacket (unsigned id ) : requestPacket {
r = new requestPacket(id)
return r
}
/*------------------------------------------------------------------*/
proc createAckPacket (unsigned id ; unsigned dest ;bool state ) : RadioPacket {
r = new ackPacket(id, state)
p = new RadioPacket(dest , any(r))
return p
}
/*******************************************************************************/
//interface for leader
type leaderI is interface(out any bcast;
out RadioPacket ucast ;
out bool dataRequest ;
in integer dataReply ;
in integer button ;
in RadioPacket ether ;
in bool timerChan )
component leader presents leaderI{
dataNum = 0
num = $0
//The data array
data = new integer[10] of 0
init = true
constructor(){
}
behaviour{
//printString("wait for packet or timeout\n");
// Either receive a packet indicating a willing worker node, or timeout and request again
select {
receive press from button : {
//printString("LEADER: button press\n");
init := false
//intial request for data
req = createRequestPacket(getNodeAddress())
setTimer(timerChan, 15.0, false)
send any(req) on bcast
}
receive packet from ether : {
/* Packet is either
* reply from worker : collect sensor data and send
* answer from worker : start again
* request from leader : reply with node id
* work from leader : fourier transform, find max, reply
* unknown packet : do nothing go round again and wait for packet or timeout
*/
project packet.payload as reading onto
workPacket : {
//printString("work\n");
ifft(reading.work, reading.work.length)
max = 0
for i = 0 .. reading.work.length-1 do {
if reading.work[i] > max then {
max := reading.work[i]
}
}
reply = createReplyPacket(getNodeAddress(),reading.id, reading.workNum, max)
send reply on ucast
}
requestPacket : {
//printString("request\n");
ack = createAckPacket(getNodeAddress(), reading.id ,true )
send ack on ucast
}
ackPacket : {
//printString("ack\n");
unsetTimer(timerChan)
for i = 0 .. data.length - 1 do {
send true on dataRequest
receive voltage from dataReply
data[i] := voltage
}
work = createWorkPacket(reading.id, getNodeAddress(), num , data)
num := num + $1
setTimer(timerChan, 5.0, false)
send work on ucast
}
replyPacket : {
//printString("rinse and repeat\n");
unsetTimer(timerChan)
//start again
req = createRequestPacket(getNodeAddress())
setTimer(timerChan, 5.0, false)
send any(req) on bcast
}
default : {}
}
//timeout: try again
receive tick from timerChan : {
//printString("timeout\n");
req = createRequestPacket(getNodeAddress())
setTimer(timerChan, 5.0, false)
send any(req) on bcast
}
}
}
}
/*****************************************************************************/
l = new leader()
connect l.button to buttonSensor.output
connect l.bcast to radio.broadcastSend
connect l.ucast to radio.unicastSend
connect l.ether to radio.received
connect l.dataRequest to sensors.batteryRequest
connect l.dataReply to sensors.batteryOut
The corresponding nesC program for TinyOS is shown below.
configuration gridAppC {}
implementation {
components MainC, gridC as App, LedsC, new DemoSensorC();
components ActiveMessageC;
components new AMSenderC(AM_RADIO_SENSE_MSG);
components new AMReceiverC(AM_RADIO_SENSE_MSG);
components new TimerMilliC();
App.Boot -> MainC.Boot;
App.Receive -> AMReceiverC;
App.AMSend -> AMSenderC;
App.RadioControl -> ActiveMessageC;
App.Leds -> LedsC;
App.MilliTimer -> TimerMilliC;
App.Packet -> AMSenderC;
App.Read -> DemoSensorC;
}
/**
* Simple grid application. One node asks for slaves to do some work, either getting a reply and giving the work, or timing out and trying again.
*
* @author Paul Harvey
* @date September 26 2011
*/
module gridC @safe(){
uses {
interface Leds;
interface Boot;
interface Receive;
interface AMSend;
interface Timer as MilliTimer;
interface Packet;
interface Read;
interface SplitControl as RadioControl;
}
}
implementation {
message_t packet;
int work_count = 0, sensor_count = 0;
int sensor_data[10];
int start = 0, count = 0, stop, lock = 0, current_work;
int work_lock = 0;
void *pload;
int tarray[10];
void ifft(int16_t xre[], int16_t xim[], uint16_t n);
event void Boot.booted() {
call RadioControl.start();
}
event void RadioControl.startDone(error_t err) {
//send request packet
// printf("Radio started %d\n", TOS_NODE_ID);
if(TOS_NODE_ID== 1){
request_packet_t* rp;
rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
if (rp == NULL) {
return;
}
rp->id = TOS_NODE_ID;
call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));
call MilliTimer.startOneShot(5120);
// printf("Sent request packet\n");
}
// printfflush();
}
event void RadioControl.stopDone(error_t err) {}
event void MilliTimer.fired() {
//send request packet
if(TOS_NODE_ID== 1){
request_packet_t* rp;
rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
if (rp == NULL) {
return;
}
rp->id = TOS_NODE_ID;
call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));
call MilliTimer.startOneShot(5120);
}
}
/*--------------------------------------------------------------------------------*/
task void collectTask() {
work_packet_t *wp;
int i =0;
wp = (work_packet_t*)call Packet.getPayload(&packet, sizeof(work_packet_t));
if (wp == NULL) {
return;
}
wp->id = TOS_NODE_ID;
wp->workNum = work_count++;
wp->size = 10;
for(i = 0 ; idata[i] = sensor_data[i];
// printf("Data[%d] = %d\n", i, wp->data[i]);printfflush();
}
call AMSend.send(0, &packet, sizeof(work_packet_t));
}
/*--------------------------------------------------------------------------------*/
event void Read.readDone(error_t result, uint16_t data) {
if(sensor_count < 10) {
sensor_data[sensor_count++] = data;
call Read.read();
}
else {
post collectTask();
sensor_count = 0;
}
}
/*--------------------------------------------------------------------------------*/
task void workTask() {
reply_packet_t* rep;
int temp[10], max = 0, i;
// printf("work task\n");printfflush();
{
//fourrier
ifft(tarray, temp, 10);
//find max
for(i = 0 ; i max)
max = tarray[i];
}
//send reply packet
}
rep = (reply_packet_t*)call Packet.getPayload(&packet, sizeof(reply_packet_t));
//if (rep != NULL) {
rep->id = TOS_NODE_ID;
rep->workNum = current_work;
rep->ans = max;
call AMSend.send(1, &packet, sizeof(reply_packet_t));
call MilliTimer.startOneShot(5120);
//}
}
/*--------------------------------------------------------------------------------*/
event message_t* Receive.receive(message_t* bufPtr, void* payload, uint8_t len) {
request_packet_t* rp;
ack_packet_t* ap;
int i = 0;
call Leds.led1Toggle();
// printf("Packet received\n");
//free request -> ack
if(len == sizeof(request_packet_t)) {
printf("Got Request\n");printfflush();
call MilliTimer.stop();
ap = (ack_packet_t*)call Packet.getPayload(&packet, sizeof(ack_packet_t));
if (ap == NULL) {
return NULL;
}
ap->id = TOS_NODE_ID;
ap->state = 1;
call AMSend.send(((request_packet_t*)payload)->id, &packet, sizeof(ack_packet_t));
}
//ack packet -> work
if(len == sizeof(ack_packet_t)) {
// printf("Got ack\n");printfflush();
call Read.read();
}
// work -> reply
if(len == sizeof(work_packet_t)) {
// printf("got work\n");printfflush();
for(i = 0 ; idata[i];
// printf("Data[%d] = %d\n", i, tarray[i]);
// printfflush();
}
current_work = ((work_packet_t*)pload)->workNum;
// printf("post work\n");printfflush();
post workTask();
}
//reply -> request
if(len == sizeof(reply_packet_t)) {
call MilliTimer.stop();
// printf("got reply\n");
// printf("Result : %d , ", ((reply_packet_t*)payload)->ans);
/* Start again */
rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
if (rp == NULL) {
return NULL;
}
rp->id = TOS_NODE_ID;
call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));
call MilliTimer.startOneShot(5120);
}
return bufPtr;
}
event void AMSend.sendDone(message_t* bufPtr, error_t error) {
}
}