#include #include #include #include #include #include #include //#include "../../status.h" #include "../../feeder.h" typedef struct import_item_t { int imported; char filename[256]; struct import_item_t * next; } import_item_t; struct import_item_t * import_list; typedef struct measurement_t { unsigned long unit_id; unsigned int sensor_id; time_t time; double value; struct measurement_t * next; } measurement_t; struct measurement_t * measurement_list; static CURL *curl_g; static void (* feederLog)(int priority, const char * fmt, ...); static void * feederLog_temp; int setLog(void * func) { feederLog = (void(*)(int, const char * , ...))func; feederLog_temp = feederLog; return 0; } void addToImport(char * filename) { struct import_item_t * item, * last_item; item = malloc(sizeof(struct import_item_t)); if (item) { strncpy(item->filename, filename, 255); item->imported = 0; item->next = NULL; if (import_list == NULL) import_list = item; else { last_item = import_list; while (last_item->next != NULL) last_item = last_item->next; last_item->next = item; } } } void freeImportList(void) { struct import_item_t * delete_item, * last_item; if (import_list == NULL) return; last_item = import_list; while (last_item->next != NULL) { delete_item = last_item; free(delete_item); last_item = last_item->next; } free(last_item); import_list = NULL; } static void parseListLine(char * filename) { char * f; if (strstr(filename, "imported") == NULL) { f = filename + strlen(filename); while ((*(f - 1) != ' ') && (f > filename)) f--; feederLog(LOG_DEBUG, "Adding file to be imported: %s\n", f); addToImport(f); } } static size_t ftpWriteList(void * buffer, size_t size, size_t nmemb, void * data) { static char filename[256]; static size_t pos = 0; size_t i; void (* line_func)(char *); line_func = data; for (i = 0; i < size * nmemb; i++) { if (((char *)buffer)[i] == '\n') { filename[pos] = '\0'; if (data != NULL) line_func(filename); pos = 0; } else filename[pos++] = ((char *)buffer)[i]; } // printf("ftpWrite()\n"); // printf("Data readen: \n%s \n", (char *)buffer); return nmemb * size; } void addMeasurement(unsigned long unit_id, unsigned long sensor_id, time_t tp, double value) { struct measurement_t * measurement, * last_measurement; measurement = malloc(sizeof(struct measurement_t)); if (measurement) { measurement->unit_id = unit_id; measurement->sensor_id = sensor_id; measurement->time = tp; measurement->value = value; measurement->next = NULL; if (measurement_list == NULL) measurement_list = measurement; else { last_measurement = measurement_list; //todo: optimalizaci (pamatovat si posledniho clena), aby nemusel vzdycky prochazet celej list while (last_measurement->next != NULL) last_measurement = last_measurement->next; last_measurement->next = measurement; } } } unsigned long sensor_id_table[] = { 00000, 54002, 54001, 55001, 00000, 34032, 00000, 00000, 00000, 36012, 36013, 56001, 00000, 37003, 34015, 00000 }; static void parseFileLine(char * line) { char * sep, * s; int i; unsigned long id; unsigned int ack_channel; time_t ack_time; double value; struct tm ack_time_tm; if (line[0] == '#') //this is comment line, skip it return; s = line; i = 0; ack_time = 0; ack_channel = 0; id = 0; while (sep = strchr(s, '\t'), sep != NULL) { *sep = 0; switch (i) { case 0: id = atol(s); break; case 1: ack_channel = atoi(s); break; case 2: sscanf(s, "%d-%d-%d %d:%d:%d", &ack_time_tm.tm_year, &ack_time_tm.tm_mon, &ack_time_tm.tm_mday, &ack_time_tm.tm_hour, &ack_time_tm.tm_min, &ack_time_tm.tm_sec); ack_time_tm.tm_year -= 1900; ack_time_tm.tm_mon -= 1; ack_time = mktime(&ack_time_tm); break; } s = sep + 1; i++; } sep = strchr(s, ','); if (sep != NULL) *sep = '.'; value = atof(s); if (sensor_id_table[ack_channel] != 0) addMeasurement(id, sensor_id_table[ack_channel] * 10000, ack_time, value); // printf("Line: id: %ld, channel: %d, time: %ld, value: %0.2f\n", id, ack_channel, ack_time, value); } CURLcode ftpImport(CURL * curl, char * path) { CURLcode res; struct import_item_t * item; char * full_filename; res = 0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseFileLine); item = import_list; while (item != NULL) { full_filename = malloc(strlen(path) + strlen(item->filename) + 1); strcpy(full_filename, path); strcpy(full_filename + strlen(full_filename), item->filename); feederLog(LOG_DEBUG, "fiedler: full filename %s\n", full_filename); curl_easy_setopt(curl, CURLOPT_URL, full_filename); curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234"); res = curl_easy_perform(curl); free(full_filename); item->imported = 1; item = item->next; } curl_easy_setopt(curl, CURLOPT_URL, path); return res; } CURLcode ftpMarkImported(CURL * curl) { struct import_item_t * item; char req[512]; char * temp; CURLcode res; curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, NULL); curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL); item = import_list; while (item != NULL) { if (item->imported) { sprintf(req, "RNFR %s", item->filename); curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req); curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234"); res = curl_easy_perform(curl); if (res != 0) feederLog(LOG_ERR, "fiedler: curl RNFR error %d\n", res); else feederLog(LOG_DEBUG, "fiedler: Renamed from %s\n", item->filename); temp = strchr(item->filename, '.'); temp[0] = 0; strcpy(item->filename + strlen(item->filename), "_imported.txt"); sprintf(req, "RNTO %s", item->filename); curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req); curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234"); res = curl_easy_perform(curl); if (res != 0) feederLog(LOG_ERR, "fiedler: curl RNTO error %d\n", res); else feederLog(LOG_DEBUG, "fiedler: Renamed to %s\n", item->filename); } item = item->next; } freeImportList(); return 0; } CURLcode ftpList(CURL * curl) { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, ftpWriteList); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseListLine); curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234"); return curl_easy_perform(curl); } unsigned int timeout(void * lib_data, int sock, unsigned char * data) { CURLcode res; char path[1024]; (void)lib_data; (void)sock; (void)data; feederLog = feederLog_temp;//todo:proc neudrzi feederLog promennou if (curl_g) { strcpy(path, "ftp://fiedler:Yeen0iFa@localhost/DATA/"); curl_easy_setopt(curl_g, CURLOPT_URL, path); res = ftpList(curl_g); if (import_list == NULL) feederLog(LOG_DEBUG, "fiedler: Nothing to import\n"); res = ftpImport(curl_g, path); res = ftpMarkImported(curl_g); } (void)res; return 0; } unsigned int process(void *lib_data, int sock, unsigned char *data, unsigned int length, unsigned long long int *id, time_t * tm, double *result_array, uint64_t * sensors, unsigned int *type) { (void)lib_data; (void)sock; (void)data; (void)length; struct measurement_t * measurement; if (measurement_list == NULL) return 0; measurement = measurement_list; result_array[0] = measurement->value; sensors[0] = measurement->sensor_id; *id = measurement->unit_id; *tm = measurement->time; *type = VALUES_TYPE_OBS; measurement_list = measurement->next; free(measurement); return 1; } int curlDebug(CURL * curl, curl_infotype info, char * string, size_t size, void * data) { (void)curl; (void)data; if (feederLog != NULL) { if (info == CURLINFO_TEXT) { string[size] = 0; feederLog(LOG_DEBUG, "fiedler: %s", string); } } return 0; } int init(void *param) { feederLog = NULL; param = param; curl_global_init(CURL_GLOBAL_DEFAULT); printf("Curl version: %s \n", curl_version()); curl_g = curl_easy_init(); curl_easy_setopt(curl_g, CURLOPT_DEBUGFUNCTION, curlDebug); import_list = 0; measurement_list = 0; return 0; } /* to test only: int main(void) { unsigned long long int id; time_t tp; double values[64]; uint64_t sensors[64]; unsigned int data_type; // timeout(NULL, 0, NULL); while (process(NULL, 0, NULL, 0, &id, &tp, values, sensors, &data_type) > 0) { printf("Zaznam: id %lld, %lld, %f\n", id, sensors[0], values[0]); } return 0; } */