Grid

This application consists of two types of node: leaders, who make requests, and slaves, who service requests. Initially the leader will broadcast a request asking for any free slave. Once a slave replies to this request, the leader collects enough sensor data to fill an array of size 10 and transmits it to the slave that acknowledged it. The slave performs a Fourier transform on the received data, calculates the maximum value and returns this to the leader.

/**
* Basic sensor net grid application
* Leader broadcasts a request for slaves, a slave replies with its ID.
* Leader collects sensor data and unicasts to the slave
* Slave does fast fourier transform on data, finds max and returns this to leader
* Leader gets work and starts again
*
* No attempt to recover missing/lost packets, if this happends there is a time out and try again
*/

//struct used for radio transmission of request and work
type workPacket is struct (unsigned id ; unsigned workNum ; integer[] work)
 
type replyPacket is struct ( unsigned id ; unsigned workNum ; integer answer )
 
type ackPacket is struct (unsigned id ; bool state)
 
type requestPacket is struct (unsigned id)
/*------------------------------------------------------------------*/
proc createReplyPacket (unsigned id ; unsigned dest ; unsigned workNum ; integer ans) : RadioPacket {
 
		r = new replyPacket(id, workNum,  ans)
		p = new RadioPacket(dest , any(r))
		return p
}
/*------------------------------------------------------------------*/
proc createWorkPacket (unsigned  dest ; unsigned id ; unsigned workNum ; integer[] work) : RadioPacket {
 
		r = new workPacket(id, workNum,  work)
        p = new RadioPacket(dest, any(r))
		return p
}
/*------------------------------------------------------------------*/
proc createRequestPacket (unsigned id ) : requestPacket {
 
		r = new requestPacket(id)
		return r
}
/*------------------------------------------------------------------*/
proc createAckPacket (unsigned id ; unsigned dest ;bool state ) : RadioPacket {
 
		r = new ackPacket(id, state)
		p = new RadioPacket(dest , any(r))
		return p
}
/*******************************************************************************/
//interface for leader
type leaderI is interface(out any bcast; 
                          out RadioPacket ucast ;
                          out bool dataRequest ;
                          in integer dataReply ;
                          in integer button ;
						  in RadioPacket ether ;
                          in bool timerChan )
 
component leader presents leaderI{
 
    dataNum  = 0
    num = $0            
	//The data array
	data = new integer[10] of 0
	init = true
 
	constructor(){
	}
 
	behaviour{
	

 			//printString("wait for packet or timeout\n");
			// Either receive a packet indicating a willing worker node, or timeout and request again
			select {
			
				receive press from button : {
					//printString("LEADER: button press\n");
	            	init := false
	 
					//intial request for data
					req = createRequestPacket(getNodeAddress())
					setTimer(timerChan, 15.0, false)   
					send any(req) on bcast
				}
				
				receive packet from ether : {
					/* Packet is either 
					 *		reply from worker 	: collect sensor data and send
					 *		answer from worker 	: start again
					 *		request from leader	: reply with node id
					 *		work from leader	: fourier transform, find max, reply
					 *		unknown packet		: do nothing go round again and wait for packet or timeout
					 */
					project packet.payload as reading onto
					       workPacket : {
					       		//printString("work\n");
								ifft(reading.work, reading.work.length)
		                              max = 0
		                              for i = 0 .. reading.work.length-1 do {
		                                  if reading.work[i] > max then {
		                                      max := reading.work[i]
		                                  }
		                              }
		                              reply = createReplyPacket(getNodeAddress(),reading.id, reading.workNum, max)
		                              send reply on ucast
							}
							requestPacket : {
							//printString("request\n");
								ack = createAckPacket(getNodeAddress(), reading.id ,true )
		                        send ack on ucast
  						    }
						    ackPacket : {
						    //printString("ack\n");
						       unsetTimer(timerChan)
 
								for i = 0 .. data.length - 1 do {
										send true on dataRequest
										receive voltage from dataReply
										data[i] := voltage
								}
 
								work = createWorkPacket(reading.id, getNodeAddress(), num , data)
 
								num := num + $1
								setTimer(timerChan, 5.0, false)
								send work on ucast
							}
							replyPacket : {
							//printString("rinse and repeat\n");
								unsetTimer(timerChan)
 
								//start again
								req = createRequestPacket(getNodeAddress())
								setTimer(timerChan, 5.0, false)
								send any(req) on bcast
							}
							default : {}
				}
 
				//timeout: try again
				receive tick from timerChan : {
				//printString("timeout\n");
					req = createRequestPacket(getNodeAddress())
					setTimer(timerChan, 5.0, false)          
					send any(req) on bcast
				}
			}
		}
    }
/*****************************************************************************/
l = new leader()
 
connect l.button 		to buttonSensor.output
connect l.bcast 		to radio.broadcastSend
connect l.ucast 		to radio.unicastSend
connect l.ether 		to radio.received
 
connect l.dataRequest 	to sensors.batteryRequest
connect l.dataReply 	to sensors.batteryOut

The corresponding nesC program for TinyOS is shown below.

configuration gridAppC {}
implementation {
  components MainC, gridC as App, LedsC, new DemoSensorC();
  components ActiveMessageC;
  components new AMSenderC(AM_RADIO_SENSE_MSG);
  components new AMReceiverC(AM_RADIO_SENSE_MSG);
  components new TimerMilliC();
  
  App.Boot -> MainC.Boot;
  
  App.Receive -> AMReceiverC;
  App.AMSend -> AMSenderC;
  App.RadioControl -> ActiveMessageC;
  App.Leds -> LedsC;
  App.MilliTimer -> TimerMilliC;
  App.Packet -> AMSenderC;
  App.Read -> DemoSensorC;
}


/**
 * Simple grid application. One node asks for slaves to do some work, either getting a reply and giving the work, or timing out and trying again.
 * 
 * @author Paul Harvey
 * @date   September 26 2011
 */

module gridC @safe(){
  uses {
    interface Leds;
    interface Boot;
    interface Receive;
    interface AMSend;
    interface Timer as MilliTimer;
    interface Packet;
    interface Read;
    interface SplitControl as RadioControl;
  }
}
implementation {

  	message_t packet;
	int work_count = 0, sensor_count = 0; 
	int sensor_data[10];
	int start = 0, count = 0, stop, lock = 0, current_work;
	int work_lock = 0;
	void *pload;
	int tarray[10];



   	void ifft(int16_t xre[], int16_t xim[], uint16_t n);
  event void Boot.booted() {
    call RadioControl.start();
  }

  event void RadioControl.startDone(error_t err) {
	//send request packet
//	printf("Radio started %d\n", TOS_NODE_ID);
	if(TOS_NODE_ID== 1){
		request_packet_t* rp;
		rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
		if (rp == NULL) {
			return;
		}	

		rp->id = TOS_NODE_ID;	
		call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));

		call MilliTimer.startOneShot(5120);
//		printf("Sent request packet\n");
	}
//	printfflush();
  }

  event void RadioControl.stopDone(error_t err) {}
  
  event void MilliTimer.fired() {
	//send request packet
	if(TOS_NODE_ID== 1){
		request_packet_t* rp;
		rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
		if (rp == NULL) {
			return;
		}	

		rp->id = TOS_NODE_ID;	
		call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));
		call MilliTimer.startOneShot(5120);
	}
  }
 /*--------------------------------------------------------------------------------*/
task void collectTask() {
	work_packet_t *wp;
	int i =0;

	wp = (work_packet_t*)call Packet.getPayload(&packet, sizeof(work_packet_t));
	if (wp == NULL) {
		return;
	}


	wp->id = TOS_NODE_ID;
	wp->workNum = work_count++;
	wp->size = 10;
	for(i = 0 ; idata[i] = sensor_data[i];
//		printf("Data[%d] = %d\n", i, wp->data[i]);printfflush();
	}
	
	call AMSend.send(0, &packet, sizeof(work_packet_t));
}
 /*--------------------------------------------------------------------------------*/
  event void Read.readDone(error_t result, uint16_t data) {
		
	if(sensor_count < 10) {
		sensor_data[sensor_count++] = data;
		call Read.read();
	}
	else {
		post collectTask();
		sensor_count = 0;
	}
}
 /*--------------------------------------------------------------------------------*/
task void workTask() {
	reply_packet_t* rep;
	int temp[10], max = 0, i;
//	printf("work task\n");printfflush();
	{
		//fourrier
		ifft(tarray, temp, 10);

		//find max
		for(i = 0 ; i  max)
				max = tarray[i];
		}

		//send reply packet

	}	
	rep = (reply_packet_t*)call Packet.getPayload(&packet, sizeof(reply_packet_t));
	//if (rep != NULL) {
		rep->id = TOS_NODE_ID;
		rep->workNum = current_work;	
		rep->ans = max;

		call AMSend.send(1, &packet, sizeof(reply_packet_t));
		call MilliTimer.startOneShot(5120);
	//}
}
 /*--------------------------------------------------------------------------------*/

  event message_t* Receive.receive(message_t* bufPtr, void* payload, uint8_t len) {



	request_packet_t* rp;
	ack_packet_t* ap;

	int i = 0;
	call Leds.led1Toggle();
  //  	printf("Packet received\n");
	

	//free request -> ack
	if(len == sizeof(request_packet_t)) {
		printf("Got Request\n");printfflush();

		call MilliTimer.stop();
		ap = (ack_packet_t*)call Packet.getPayload(&packet, sizeof(ack_packet_t));
		if (ap == NULL) {
			return NULL;
		}

		ap->id = TOS_NODE_ID;
		ap->state = 1;	

		call AMSend.send(((request_packet_t*)payload)->id, &packet, sizeof(ack_packet_t));
	} 

	//ack packet -> work
	if(len == sizeof(ack_packet_t)) {
//		printf("Got ack\n");printfflush();
		call Read.read();
	} 

	// work -> reply
	if(len == sizeof(work_packet_t)) {
//		printf("got work\n");printfflush();

		for(i = 0 ; idata[i];
//			printf("Data[%d] = %d\n", i, tarray[i]);
//			printfflush();
		}

		current_work = ((work_packet_t*)pload)->workNum;
//		printf("post work\n");printfflush();
		post workTask();
		

	} 

	//reply -> request	
	if(len == sizeof(reply_packet_t)) {
		call MilliTimer.stop();
//		printf("got reply\n");
//		printf("Result : %d , ", ((reply_packet_t*)payload)->ans);
		
		/* Start again */

		rp = (request_packet_t*)call Packet.getPayload(&packet, sizeof(request_packet_t));
		if (rp == NULL) {
			return NULL;
		}	

		rp->id = TOS_NODE_ID;	
		call AMSend.send(AM_BROADCAST_ADDR, &packet, sizeof(request_packet_t));
		call MilliTimer.startOneShot(5120);
	} 



	return bufPtr;
    
  }

  event void AMSend.sendDone(message_t* bufPtr, error_t error) {
    
  }
}