


/*
 * ensure all of data on socket comes through. f==read || f==write
 */

// http://awgn.antifork.org/codes/usr8550.c
/*
ssize_t
atomicio(f, fd, _s, n)
ssize_t(*f) ();
	int fd;
	void *_s;
	size_t n;
{
	char *s = _s;
	ssize_t res, pos = 0;

	while (n > pos) {
		res = (f) (fd, s + pos, n - pos);
		switch (res) {
		case -1:
			if (errno == EINTR || errno == EAGAIN)
				continue;
			else
				fatal("atomicio(): %s\n", strerror(errno));
		case 0:
			return (res);
		default:
			pos += res;
		}
	}
	return (pos);
}
*/


int setnonblock(SOCKET sock,int to)
{
#ifdef WIN32
	setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,(char*) &to, sizeof(int));
#else
	struct timeval timeout;
	
	timeout.tv_sec = to/1000;
	timeout.tv_usec=0;
	
	if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,(char*) &timeout, sizeof(timeout)))
		return 0;
#endif
	return 1;
}


int StartUpWinsock()
{
#ifdef WIN32
	WSADATA wsadata;
	
	if((WSAStartup(MAKEWORD(1,1),&wsadata))!=0 || (LOBYTE(wsadata.wVersion )!=1 || HIBYTE(wsadata.wVersion)!=1))
		return 0;
#endif
	return 1;
}

void EmptySocketBuffer(SOCKET sock)
{
char* msg;
    msg = malloc(100000);

    recv(sock,msg,100000,0);

    FREE(msg);
}

int ConnectToAllSubNodes()
{
unsigned int i;

    StartUpWinsock();

    for(i=0; i<subNodesStruct.saddrElements; i++)
    {
        subNodesStruct.socks[i] = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);

	    if ( connect( (SOCKET)subNodesStruct.socks[i], (LPSOCKADDR) &subNodesStruct.saddrSubNodes[i], sizeof(SOCKADDR_IN) ) == SOCKET_ERROR)
        {
            printf("Node %d: Failed\n",i+1);
            subNodesStruct.isConnected[i]=0;
            closesocket(subNodesStruct.socks[i]);
        }
        else
        {
            printf("Node %d: OK\n",i+1);
            subNodesStruct.isConnected[i]=1;
            setnonblock(subNodesStruct.socks[i], 50);

			/*
			{
				int nonblocking = 1;
            	ioctlsocket(subNodesStruct.socks[i], FIONBIO, &nonblocking);
			}
			*/
        }
    }

    return 1;
}

/* ricerca OOI: invia a tutti nodi l'array delle parole che formano la query */
void SendAMessageToAllNodesOOI(unsigned int* query, int words)
{
unsigned int i;
int res;
int* msg;
    
    /* alloca il messaggio che invieremo della lunghezza dell'array + il primo 
       valore del pacchetto che identifica la lunghezza 
       PROTOCOLLO: | b | numero parole | word1 | word2 | ... | wordN | 
     */
    msg = malloc(sizeof(int)*(words+2));
    msg[0] = words;
    memcpy(msg+1,query,sizeof(int)*words);

    for(i=0; i<subNodesStruct.saddrElements; i++)
	{
        if(subNodesStruct.isConnected[i])
        {
            res = SEND(subNodesStruct.socks[i], "b");

            /* un nodo si è sconnesso */
            if(res == -1)
            {
                printf("\nNode %d: is now DOWN\n",i+1);
                subNodesStruct.isConnected[i] = 0;
                continue;
            }


            res = SEND_BIN(subNodesStruct.socks[i], msg, sizeof(int)*(words+1));
            
            /* un nodo si è sconnesso */
            if(res == -1)
            {
                printf("\nNode %d: is now DOWN\n",i+1);
                subNodesStruct.isConnected[i] = 0;
                continue;
            }
        }
	}

	FREE(msg);

}

void SendAMessageToAllNodesMysql(char* query)
{
unsigned int i;
int res;
int* msg;
    
    /* alloca il messaggio che invieremo della lunghezza della query + la lunghezza della query in bytes
       PROTOCOLLO: | t | lunghezza | query | 
     */
    msg = malloc( strlen(query) + sizeof(int) );
    msg[0] = strlen(query);
    memcpy(msg+1,query,strlen(query) );

    for(i=0; i<subNodesStruct.saddrElements; i++)
	{
        if(subNodesStruct.isConnected[i])
        {
            res = SEND(subNodesStruct.socks[i], "t");

            /* un nodo si è sconnesso */
            if(res == -1)
            {
                printf("\nNode %d: is now DOWN\n",i+1);
                subNodesStruct.isConnected[i] = 0;
                continue;
            }


            res = SEND_BIN(subNodesStruct.socks[i], msg, sizeof(int) + strlen(query) );
            
            /* un nodo si è sconnesso */
            if(res == -1)
            {
                printf("\nNode %d: is now DOWN\n",i+1);
                subNodesStruct.isConnected[i] = 0;
                continue;
            }
        }
	}

	FREE(msg);

}

void CreateRecvMessagesThread()
{

    /* inizializza la struttura che riceve le query e i risultati */
    InitQueryBuf(1, -1);

    /* se non c'è nessun sotto-nodo: non creare 
           il thread di ricevimento messaggi */
    if(subNodesStruct.saddrElements==0)
        return ;

#ifdef WIN32
	_beginthreadex(NULL,0,RecvAMessages,(void*)NULL,0,NULL);
#else
int errorCode;
pthread_t ptTmp;
    if( (errorCode=pthread_create(&ptTmp, NULL, RecvAMessages, (void*)NULL)) != 0 )
    {
            printf("\r\nThread error (%i):\r\n",errorCode);
            perror(" -    pthread_create() ");
            exit(0);
    }
#endif

}

/* crea un thread che ogni tot secondi controlla che
un sotto-nodo non connesso sia diventato attivo*/
void CreateCheckSubNodesThread()
{
    /* se non c'è nessun sotto-nodo: non creare 
           il thread di controllo */
    if(subNodesStruct.saddrElements==0)
        return ;

#ifdef WIN32
	_beginthreadex(NULL,0,CheckSubNodes,(void*)NULL,0,NULL);
#else
int errorCode;
pthread_t ptTmp;
    if( (errorCode=pthread_create(&ptTmp, NULL, CheckSubNodes, (void*)NULL)) != 0 )
    {
            printf("\r\nThread error (%i):\r\n",errorCode);
            perror(" -    pthread_create() ");
            exit(0);
    }
#endif

}

/* InitQueryBuf
 *   inizializza la struttura di gestione delle query e dei risultati
 *     se slot == -1 inizializza tutto
 *     altrimenti    inizializza solo lo slot specificato
 */
void InitQueryBuf(int full, int slot)
{
int i,c;

    /* inizializza la struttura per la prima esecuzione */
    if(full==1)
    {
        if(slot >= 0)
        {
            queryStt[slot].nWords = 0;
            queryStt[slot].rkRes = NULL;
            queryStt[slot].nRkRes = 0;
        }
        else
        {
            for(i=0;i<MAXPARRSEARCH;i++)
            {
                queryStt[i].nWords = 0;
                queryStt[i].rkRes = NULL;
                queryStt[i].nRkRes = 0;
            }
        }
    }

/* reinit results and query */
    if(slot >= 0)
    {
        queryStt[slot].available = 1;
        queryStt[slot].nWords = 0;
        queryStt[slot].nRkRes = 0;
        FREE(queryStt[slot].rkRes);
        queryStt[slot].rkRes = NULL;

        for(c=0; c<subNodesStruct.saddrElements; c++)
            queryStt[slot].NodeHasSent[c] = 0;
    }
    else
    {
        for(i=0;i<MAXPARRSEARCH;i++)
        {
            queryStt[i].available = 1;
            queryStt[i].nWords = 0;
            queryStt[i].nRkRes = 0;
            FREE(queryStt[i].rkRes);
            queryStt[i].rkRes = NULL;
            
            for(c=0; c<subNodesStruct.saddrElements; c++)
                queryStt[i].NodeHasSent[c] = 0;
        }
    }

}

/* questa funzione riceve in loop i messaggi con i risultati dai sotto-nodi
   (funzione eseguita in un thread separato) */
#ifdef WIN32
unsigned __stdcall 
#else
void* 
#endif
RecvAMessages(void* __nnnn__)
{
unsigned int i;
int rec;
char packetType[2];
int nWords;
int* msg			= NULL;
char* text_query	= NULL;
int nResults;
RANKS* results;
int QuerySlot;

    while(1)
    {
        /* se non c'è nessuna query in coda non ricevere messaggi */
        if(ActiveQuery()==0)
        {
            Sleep(SLEEP_DELAY1);
            continue;
        }

        for(i=0; i<subNodesStruct.saddrElements; i++)
        {
            /* cicla tutti i sotto-nodi connessi e prova a ricevere un messaggio completo 
               da ognuno di loro! se non riesce nel tempo dato svuota tutto e passa 
               al successivo
             */
            rec=0;
            if(subNodesStruct.isConnected[i])
            {
                /* inizio: ricezione messaggio */
                /* ricevo il primo byte contenente il tipo di pacchetto */
                if(recv(subNodesStruct.socks[i],(char*)&packetType,1,0)==1)
                {
                    /* se il pacchetto non è binario(ooi(b) o mysql(t)), continua il ciclo dall'inizio */
                    if(packetType[0]!='b' && packetType[0]!='t')
                    {
                        /* svuoto il buffer del socket funzionerà?!? */
                        EmptySocketBuffer(subNodesStruct.socks[i]);
                        continue;
                    }
                }
                else
                    continue;

				
				msg			= NULL;
				text_query	= NULL;

				
				/* pacchetto binario da nodo ooi */
				if(packetType[0] == 'b')
				{

					/* ricevo un byte con il numero di parole della query */
					if(recv(subNodesStruct.socks[i],(char*)&nWords,sizeof(int),0)==sizeof(int))
					{
						/* se bufIn è 0 il pacchetto non è corretto */
						if(nWords<=0)
						{
							EmptySocketBuffer(subNodesStruct.socks[i]);
							continue;
						}

						msg = malloc(nWords*sizeof(int));

						/* se ricevi correttamente la query */
						if(recv(subNodesStruct.socks[i],(char*)msg,nWords*sizeof(int),0)==nWords*sizeof(int))
						{
							/* ricevi il numero risultati */
							if(recv(subNodesStruct.socks[i],(char*)&nResults,sizeof(int),0)==sizeof(int))
							{
								/* se nResults è minore di 0 il pacchetto non è corretto */
								if(nResults<0)
								{
									EmptySocketBuffer(subNodesStruct.socks[i]);
									FREE(msg);
									continue;
								}

								/* se ci sono risultati */
								if(nResults>0)
								{
									/* alloca una riga in più in quanto la struttura dei risultati
									   prevede che l'ultimo elemento sia a 0 */
									results = malloc((nResults+1)*sizeof(RANKS));

									/* se ricevi correttamente i risultati */
									if(recv(subNodesStruct.socks[i],(char*)results,nResults*sizeof(RANKS),0)==nResults*sizeof(RANKS))
									{
										rec = 1;

										/* setta l'ultimo risultato a 0 */
										/* da controllare se funziona */
										results[nResults].page = 0;
									}
									else
									{
										EmptySocketBuffer(subNodesStruct.socks[i]);
										FREE(msg);
										FREE(results);
										continue;
									}
								}
								else
								{
									results = NULL;
									rec = 1;
								}
							}
						}
						else
						{
							EmptySocketBuffer(subNodesStruct.socks[i]);
							FREE(msg);
							continue;
						}
					}
					/*fine: ricezione messaggio*/
				}
				else if( packetType[0] == 't' )
				{
					
					/* ricevo un byte con il numero di caratteri della query ( max 255 ) */
					if(recv(subNodesStruct.socks[i],(char*)&nWords,sizeof(int),0)==sizeof(int))
					{
						/* se bufIn è 0 il pacchetto non è corretto */
						if(nWords<=0)
						{
							EmptySocketBuffer(subNodesStruct.socks[i]);
							continue;
						}

						text_query = malloc( nWords + 1 );

						/* se ricevi correttamente la query */
						if(recv(subNodesStruct.socks[i],(char*)text_query,nWords,0)==nWords)
						{
							
							text_query[nWords] = 0;

							/********************* codice condiviso *******************/
							/* ricevi il numero risultati */
							if(recv(subNodesStruct.socks[i],(char*)&nResults,sizeof(int),0)==sizeof(int))
							{
								
								/* se nResults è minore di 0 il pacchetto non è corretto */
								if(nResults<0)
								{
									EmptySocketBuffer(subNodesStruct.socks[i]);
									FREE(text_query);
									continue;
								}

								/* se ci sono risultati */
								if(nResults>0)
								{
									/* alloca una riga in più in quanto la struttura dei risultati
									   prevede che l'ultimo elemento sia a 0 */
									results = malloc((nResults+1)*sizeof(RANKS));

									/* se ricevi correttamente i risultati */
									if(recv(subNodesStruct.socks[i],(char*)results,nResults*sizeof(RANKS),0)==nResults*sizeof(RANKS))
									{
										rec = 1;

										/* setta l'ultimo risultato a 0 */
										/* da controllare se funziona */
										results[nResults].page = 0;
									}
									else
									{
										EmptySocketBuffer(subNodesStruct.socks[i]);
										FREE(text_query);
										FREE(results);
										continue;
									}
								}
								else
								{
									results = NULL;
									rec = 1;
								}
								/********************* codice condiviso *******************/
							}

						}
						else
						{
							EmptySocketBuffer(subNodesStruct.socks[i]);
							FREE(msg);
							continue;
						}
					}
				}

                /*inizio: concatenazione messaggio*/
                if( rec == 1 )
                {
                    
					if( packetType[0] == 'b' )
						QuerySlot=GetQuerySlotByText(msg, nWords);
					else
						QuerySlot=GetQuerySlotByTextQuery(text_query);

                    /* se c'è la query nella struttura */
                    if( QuerySlot >= 0 )
                    {
                        queryStt[QuerySlot].rkRes = mergeResults(queryStt[QuerySlot].rkRes, queryStt[QuerySlot].nRkRes, results, nResults);
                        queryStt[QuerySlot].nRkRes = queryStt[QuerySlot].nRkRes + nResults;

                        /* ordina i risultati per rank */
                        insertion_sortRanks(queryStt[QuerySlot].rkRes, queryStt[QuerySlot].nRkRes, 1, 1);

                        /* limita i risultati a MAXRESULTSxNODE */
                        if(queryStt[QuerySlot].nRkRes > MAXRESULTSxNODE)
                        {
                            queryStt[QuerySlot].rkRes = limitResults(queryStt[QuerySlot].rkRes, MAXRESULTSxNODE);
                            queryStt[QuerySlot].nRkRes = MAXRESULTSxNODE;
                        }

                        queryStt[QuerySlot].NodeHasSent[i] = 1;
                    }

                    //FREE(query);
                    FREE(results);
                    FREE(msg);
					FREE(text_query);
                }
                /*fine: concatenazione messaggio*/

            }
            /* if(subNodesStruct.isConnected[i]) */
            else
                /* c'è un nodo non connesso */
                Sleep(SLEEP_DELAY1);

            

        }/* end for */

       // Sleep(SLEEP_DELAY1);
    }

    /* end-less loop */
}


#ifdef WIN32
unsigned __stdcall 
#else
void* 
#endif
CheckSubNodes(void* __nnnn__)
{
int i;

    while(1)
    {
        Sleep(CHECK_INTERVAL);

        for(i=0; i<subNodesStruct.saddrElements; i++)
        {

            /* se il nodo non è connesso: prova a connetterti */
            if(subNodesStruct.isConnected[i]==0)
            {
                subNodesStruct.socks[i] = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
                if ( connect( (SOCKET)subNodesStruct.socks[i], (LPSOCKADDR) &subNodesStruct.saddrSubNodes[i], sizeof(SOCKADDR_IN) ) == 0 )
                {
                    printf("\nNode %d: is now UP\n",i+1);
                    /* problema: se faccio una query 
                       e la invio a tutti i sotto-nodi e il sotto-nodo che
                       sto controllando si connette; alla ricezione mi
                       aspetto un risultato anche da questo sotto-nodo */
                    subNodesStruct.isConnected[i]=1;

                    setnonblock(subNodesStruct.socks[i], 50);
                }
                else
                    closesocket(subNodesStruct.socks[i]);
            }

        }
    }

    /* end-less loop */
}

/*old*/
/*
char* RecvAMessageFromAllNodes()
{
unsigned int i;
int rec;
int x;
char bufIn[2];
char msg[MAXRESULTSSIZE];
MYCSTR results;

    results.myString = NULL;
    results.myString = myCStrCpy(&results,"");

    for(i=0; i<subNodesStruct.saddrElements; i++)
        if(subNodesStruct.isConnected[i])
        {
            x = 0;
            memset(msg,0,MAXRESULTSSIZE);
            rec = 1;

            while(rec && x<MAXRESULTSSIZE)
            {
                if(recv(subNodesStruct.socks[i],bufIn,1,0)==1)
	            {
                    if(bufIn[0]=='\r' || bufIn[0]=='\n' || bufIn[0]=='\0')
                        rec = 0;
                    else
		                msg[x++] = bufIn[0];
		        }
                else
                    rec = 0;
            }
            results.myString = myCStrCat(&results,msg);
        }

    return results.myString;
}
*/
