| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- #include <stdio.h>
- #include <stdint.h>
- #include <stdlib.h>
- #include <string.h>
- #include <syslog.h>
- #include <stdarg.h>
- #include <curl/curl.h>
- //#include "../../status.h"
- #include "../../feeder.h"
- typedef struct import_item_t
- {
- int imported;
- char filename[256];
- struct import_item_t * next;
- } import_item_t;
- struct import_item_t * import_list;
- typedef struct measurement_t
- {
- unsigned long unit_id;
- unsigned int sensor_id;
- time_t time;
- double value;
- struct measurement_t * next;
- } measurement_t;
- struct measurement_t * measurement_list;
- static CURL *curl_g;
- static void (* feederLog)(int priority, const char * fmt, ...);
- static void * feederLog_temp;
- int setLog(void * func)
- {
- feederLog = (void(*)(int, const char * , ...))func;
- feederLog_temp = feederLog;
- return 0;
- }
- void addToImport(char * filename)
- {
- struct import_item_t * item, * last_item;
-
- item = malloc(sizeof(struct import_item_t));
- if (item)
- {
- strncpy(item->filename, filename, 255);
- item->imported = 0;
- item->next = NULL;
- if (import_list == NULL)
- import_list = item;
- else
- {
- last_item = import_list;
- while (last_item->next != NULL)
- last_item = last_item->next;
- last_item->next = item;
- }
- }
- }
- void freeImportList(void)
- {
- struct import_item_t * delete_item, * last_item;
- if (import_list == NULL)
- return;
- last_item = import_list;
- while (last_item->next != NULL)
- {
- delete_item = last_item;
- free(delete_item);
- last_item = last_item->next;
- }
- free(last_item);
- import_list = NULL;
- }
- static void parseListLine(char * filename)
- {
- char * f;
- if (strstr(filename, "imported") == NULL)
- {
- f = filename + strlen(filename);
- while ((*(f - 1) != ' ') && (f > filename))
- f--;
-
- feederLog(LOG_DEBUG, "Adding file to be imported: %s\n", f);
- addToImport(f);
- }
- }
- static size_t ftpWriteList(void * buffer, size_t size, size_t nmemb, void * data)
- {
- static char filename[256];
- static size_t pos = 0;
- size_t i;
- void (* line_func)(char *);
- line_func = data;
- for (i = 0; i < size * nmemb; i++)
- {
- if (((char *)buffer)[i] == '\n')
- {
- filename[pos] = '\0';
- if (data != NULL)
- line_func(filename);
- pos = 0;
- }
- else
- filename[pos++] = ((char *)buffer)[i];
- }
- // printf("ftpWrite()\n");
- // printf("Data readen: \n%s \n", (char *)buffer);
- return nmemb * size;
- }
- void addMeasurement(unsigned long unit_id, unsigned long sensor_id, time_t tp, double value)
- {
- struct measurement_t * measurement, * last_measurement;
-
- measurement = malloc(sizeof(struct measurement_t));
- if (measurement)
- {
- measurement->unit_id = unit_id;
- measurement->sensor_id = sensor_id;
- measurement->time = tp;
- measurement->value = value;
- measurement->next = NULL;
- if (measurement_list == NULL)
- measurement_list = measurement;
- else
- {
- last_measurement = measurement_list; //todo: optimalizaci (pamatovat si posledniho clena), aby nemusel vzdycky prochazet celej list
- while (last_measurement->next != NULL)
- last_measurement = last_measurement->next;
- last_measurement->next = measurement;
- }
- }
- }
- unsigned long sensor_id_table[] =
- {
- 00000, 54002, 54001, 55001,
- 00000, 34032, 00000, 00000,
- 00000, 36012, 36013, 56001,
- 00000, 37003, 34015, 00000
- };
- static void parseFileLine(char * line)
- {
- char * sep, * s;
- int i;
- unsigned long id;
- unsigned int ack_channel;
- time_t ack_time;
- double value;
- struct tm ack_time_tm;
- if (line[0] == '#') //this is comment line, skip it
- return;
- s = line;
- i = 0;
- ack_time = 0;
- ack_channel = 0;
- id = 0;
- while (sep = strchr(s, '\t'), sep != NULL)
- {
- *sep = 0;
- switch (i)
- {
- case 0:
- id = atol(s);
- break;
- case 1:
- ack_channel = atoi(s);
- break;
- case 2:
- sscanf(s, "%d-%d-%d %d:%d:%d", &ack_time_tm.tm_year, &ack_time_tm.tm_mon, &ack_time_tm.tm_mday, &ack_time_tm.tm_hour, &ack_time_tm.tm_min, &ack_time_tm.tm_sec);
- ack_time_tm.tm_year -= 1900;
- ack_time_tm.tm_mon -= 1;
- ack_time = mktime(&ack_time_tm);
- break;
- }
- s = sep + 1;
- i++;
- }
- sep = strchr(s, ',');
- if (sep != NULL)
- *sep = '.';
- value = atof(s);
- if (sensor_id_table[ack_channel] != 0)
- addMeasurement(id, sensor_id_table[ack_channel] * 10000, ack_time, value);
- // printf("Line: id: %ld, channel: %d, time: %ld, value: %0.2f\n", id, ack_channel, ack_time, value);
- }
- CURLcode ftpImport(CURL * curl, char * path)
- {
- CURLcode res;
- struct import_item_t * item;
- char * full_filename;
- res = 0;
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseFileLine);
- item = import_list;
- while (item != NULL)
- {
- full_filename = malloc(strlen(path) + strlen(item->filename) + 1);
- strcpy(full_filename, path);
- strcpy(full_filename + strlen(full_filename), item->filename);
- feederLog(LOG_DEBUG, "fiedler: full filename %s\n", full_filename);
- curl_easy_setopt(curl, CURLOPT_URL, full_filename);
- curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
- res = curl_easy_perform(curl);
-
- free(full_filename);
- item->imported = 1;
- item = item->next;
- }
- curl_easy_setopt(curl, CURLOPT_URL, path);
- return res;
- }
- CURLcode ftpMarkImported(CURL * curl)
- {
- struct import_item_t * item;
- char req[512];
- char * temp;
- CURLcode res;
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, NULL);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
- item = import_list;
- while (item != NULL)
- {
- if (item->imported)
- {
- sprintf(req, "RNFR %s", item->filename);
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req);
- curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
- res = curl_easy_perform(curl);
- if (res != 0)
- feederLog(LOG_ERR, "fiedler: curl RNFR error %d\n", res);
- else
- feederLog(LOG_DEBUG, "fiedler: Renamed from %s\n", item->filename);
- temp = strchr(item->filename, '.');
- temp[0] = 0;
- strcpy(item->filename + strlen(item->filename), "_imported.txt");
- sprintf(req, "RNTO %s", item->filename);
- curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req);
- curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
- res = curl_easy_perform(curl);
- if (res != 0)
- feederLog(LOG_ERR, "fiedler: curl RNTO error %d\n", res);
- else
- feederLog(LOG_DEBUG, "fiedler: Renamed to %s\n", item->filename);
- }
- item = item->next;
- }
- freeImportList();
- return 0;
- }
- CURLcode ftpList(CURL * curl)
- {
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, ftpWriteList);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseListLine);
- curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
- curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
- curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
-
- return curl_easy_perform(curl);
- }
- unsigned int timeout(void * lib_data, int sock, unsigned char * data)
- {
- CURLcode res;
- char path[1024];
-
- (void)lib_data;
- (void)sock;
- (void)data;
- feederLog = feederLog_temp;//todo:proc neudrzi feederLog promennou
- if (curl_g)
- {
- strcpy(path, "ftp://fiedler:Yeen0iFa@localhost/DATA/");
- curl_easy_setopt(curl_g, CURLOPT_URL, path);
- res = ftpList(curl_g);
- if (import_list == NULL)
- feederLog(LOG_DEBUG, "fiedler: Nothing to import\n");
- res = ftpImport(curl_g, path);
- res = ftpMarkImported(curl_g);
- }
- (void)res;
- return 0;
- }
- unsigned int
- process(void *lib_data, int sock, unsigned char *data,
- unsigned int length, unsigned long long int *id, time_t * tm,
- double *result_array, uint64_t * sensors, unsigned int *type)
- {
- (void)lib_data;
- (void)sock;
- (void)data;
- (void)length;
- struct measurement_t * measurement;
- if (measurement_list == NULL)
- return 0;
- measurement = measurement_list;
-
- result_array[0] = measurement->value;
- sensors[0] = measurement->sensor_id;
- *id = measurement->unit_id;
- *tm = measurement->time;
- *type = VALUES_TYPE_OBS;
- measurement_list = measurement->next;
- free(measurement);
- return 1;
- }
- int curlDebug(CURL * curl, curl_infotype info, char * string, size_t size, void * data)
- {
- (void)curl;
- (void)data;
- if (feederLog != NULL)
- {
- if (info == CURLINFO_TEXT)
- {
- string[size] = 0;
- feederLog(LOG_DEBUG, "fiedler: %s", string);
- }
- }
- return 0;
- }
- int init(void *param)
- {
- feederLog = NULL;
- param = param;
- curl_global_init(CURL_GLOBAL_DEFAULT);
- printf("Curl version: %s \n", curl_version());
- curl_g = curl_easy_init();
- curl_easy_setopt(curl_g, CURLOPT_DEBUGFUNCTION, curlDebug);
- import_list = 0;
- measurement_list = 0;
- return 0;
- }
- /*
- to test only:
- int main(void)
- {
- unsigned long long int id;
- time_t tp;
- double values[64];
- uint64_t sensors[64];
- unsigned int data_type;
- // timeout(NULL, 0, NULL);
- while (process(NULL, 0, NULL, 0, &id, &tp, values, sensors, &data_type) > 0)
- {
- printf("Zaznam: id %lld, %lld, %f\n", id, sensors[0], values[0]);
- }
-
- return 0;
- }
- */
|