fiedler.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. #include <stdio.h>
  2. #include <stdint.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <syslog.h>
  6. #include <stdarg.h>
  7. #include <curl/curl.h>
  8. //#include "../../status.h"
  9. #include "../../feeder.h"
  10. typedef struct import_item_t
  11. {
  12. int imported;
  13. char filename[256];
  14. struct import_item_t * next;
  15. } import_item_t;
  16. struct import_item_t * import_list;
  17. typedef struct measurement_t
  18. {
  19. unsigned long unit_id;
  20. unsigned int sensor_id;
  21. time_t time;
  22. double value;
  23. struct measurement_t * next;
  24. } measurement_t;
  25. struct measurement_t * measurement_list;
  26. static CURL *curl_g;
  27. static void (* feederLog)(int priority, const char * fmt, ...);
  28. static void * feederLog_temp;
  29. int setLog(void * func)
  30. {
  31. feederLog = (void(*)(int, const char * , ...))func;
  32. feederLog_temp = feederLog;
  33. return 0;
  34. }
  35. void addToImport(char * filename)
  36. {
  37. struct import_item_t * item, * last_item;
  38. item = malloc(sizeof(struct import_item_t));
  39. if (item)
  40. {
  41. strncpy(item->filename, filename, 255);
  42. item->imported = 0;
  43. item->next = NULL;
  44. if (import_list == NULL)
  45. import_list = item;
  46. else
  47. {
  48. last_item = import_list;
  49. while (last_item->next != NULL)
  50. last_item = last_item->next;
  51. last_item->next = item;
  52. }
  53. }
  54. }
  55. void freeImportList(void)
  56. {
  57. struct import_item_t * delete_item, * last_item;
  58. if (import_list == NULL)
  59. return;
  60. last_item = import_list;
  61. while (last_item->next != NULL)
  62. {
  63. delete_item = last_item;
  64. free(delete_item);
  65. last_item = last_item->next;
  66. }
  67. free(last_item);
  68. import_list = NULL;
  69. }
  70. static void parseListLine(char * filename)
  71. {
  72. char * f;
  73. if (strstr(filename, "imported") == NULL)
  74. {
  75. f = filename + strlen(filename);
  76. while ((*(f - 1) != ' ') && (f > filename))
  77. f--;
  78. feederLog(LOG_DEBUG, "Adding file to be imported: %s\n", f);
  79. addToImport(f);
  80. }
  81. }
  82. static size_t ftpWriteList(void * buffer, size_t size, size_t nmemb, void * data)
  83. {
  84. static char filename[256];
  85. static size_t pos = 0;
  86. size_t i;
  87. void (* line_func)(char *);
  88. line_func = data;
  89. for (i = 0; i < size * nmemb; i++)
  90. {
  91. if (((char *)buffer)[i] == '\n')
  92. {
  93. filename[pos] = '\0';
  94. if (data != NULL)
  95. line_func(filename);
  96. pos = 0;
  97. }
  98. else
  99. filename[pos++] = ((char *)buffer)[i];
  100. }
  101. // printf("ftpWrite()\n");
  102. // printf("Data readen: \n%s \n", (char *)buffer);
  103. return nmemb * size;
  104. }
  105. void addMeasurement(unsigned long unit_id, unsigned long sensor_id, time_t tp, double value)
  106. {
  107. struct measurement_t * measurement, * last_measurement;
  108. measurement = malloc(sizeof(struct measurement_t));
  109. if (measurement)
  110. {
  111. measurement->unit_id = unit_id;
  112. measurement->sensor_id = sensor_id;
  113. measurement->time = tp;
  114. measurement->value = value;
  115. measurement->next = NULL;
  116. if (measurement_list == NULL)
  117. measurement_list = measurement;
  118. else
  119. {
  120. last_measurement = measurement_list; //todo: optimalizaci (pamatovat si posledniho clena), aby nemusel vzdycky prochazet celej list
  121. while (last_measurement->next != NULL)
  122. last_measurement = last_measurement->next;
  123. last_measurement->next = measurement;
  124. }
  125. }
  126. }
  127. unsigned long sensor_id_table[] =
  128. {
  129. 00000, 54002, 54001, 55001,
  130. 00000, 34032, 00000, 00000,
  131. 00000, 36012, 36013, 56001,
  132. 00000, 37003, 34015, 00000
  133. };
  134. static void parseFileLine(char * line)
  135. {
  136. char * sep, * s;
  137. int i;
  138. unsigned long id;
  139. unsigned int ack_channel;
  140. time_t ack_time;
  141. double value;
  142. struct tm ack_time_tm;
  143. if (line[0] == '#') //this is comment line, skip it
  144. return;
  145. s = line;
  146. i = 0;
  147. ack_time = 0;
  148. ack_channel = 0;
  149. id = 0;
  150. while (sep = strchr(s, '\t'), sep != NULL)
  151. {
  152. *sep = 0;
  153. switch (i)
  154. {
  155. case 0:
  156. id = atol(s);
  157. break;
  158. case 1:
  159. ack_channel = atoi(s);
  160. break;
  161. case 2:
  162. sscanf(s, "%d-%d-%d %d:%d:%d", &ack_time_tm.tm_year, &ack_time_tm.tm_mon, &ack_time_tm.tm_mday, &ack_time_tm.tm_hour, &ack_time_tm.tm_min, &ack_time_tm.tm_sec);
  163. ack_time_tm.tm_year -= 1900;
  164. ack_time_tm.tm_mon -= 1;
  165. ack_time = mktime(&ack_time_tm);
  166. break;
  167. }
  168. s = sep + 1;
  169. i++;
  170. }
  171. sep = strchr(s, ',');
  172. if (sep != NULL)
  173. *sep = '.';
  174. value = atof(s);
  175. if (sensor_id_table[ack_channel] != 0)
  176. addMeasurement(id, sensor_id_table[ack_channel] * 10000, ack_time, value);
  177. // printf("Line: id: %ld, channel: %d, time: %ld, value: %0.2f\n", id, ack_channel, ack_time, value);
  178. }
  179. CURLcode ftpImport(CURL * curl, char * path)
  180. {
  181. CURLcode res;
  182. struct import_item_t * item;
  183. char * full_filename;
  184. res = 0;
  185. curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseFileLine);
  186. item = import_list;
  187. while (item != NULL)
  188. {
  189. full_filename = malloc(strlen(path) + strlen(item->filename) + 1);
  190. strcpy(full_filename, path);
  191. strcpy(full_filename + strlen(full_filename), item->filename);
  192. feederLog(LOG_DEBUG, "fiedler: full filename %s\n", full_filename);
  193. curl_easy_setopt(curl, CURLOPT_URL, full_filename);
  194. curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
  195. res = curl_easy_perform(curl);
  196. free(full_filename);
  197. item->imported = 1;
  198. item = item->next;
  199. }
  200. curl_easy_setopt(curl, CURLOPT_URL, path);
  201. return res;
  202. }
  203. CURLcode ftpMarkImported(CURL * curl)
  204. {
  205. struct import_item_t * item;
  206. char req[512];
  207. char * temp;
  208. CURLcode res;
  209. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, NULL);
  210. curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
  211. item = import_list;
  212. while (item != NULL)
  213. {
  214. if (item->imported)
  215. {
  216. sprintf(req, "RNFR %s", item->filename);
  217. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req);
  218. curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
  219. res = curl_easy_perform(curl);
  220. if (res != 0)
  221. feederLog(LOG_ERR, "fiedler: curl RNFR error %d\n", res);
  222. else
  223. feederLog(LOG_DEBUG, "fiedler: Renamed from %s\n", item->filename);
  224. temp = strchr(item->filename, '.');
  225. temp[0] = 0;
  226. strcpy(item->filename + strlen(item->filename), "_imported.txt");
  227. sprintf(req, "RNTO %s", item->filename);
  228. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, req);
  229. curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
  230. res = curl_easy_perform(curl);
  231. if (res != 0)
  232. feederLog(LOG_ERR, "fiedler: curl RNTO error %d\n", res);
  233. else
  234. feederLog(LOG_DEBUG, "fiedler: Renamed to %s\n", item->filename);
  235. }
  236. item = item->next;
  237. }
  238. freeImportList();
  239. return 0;
  240. }
  241. CURLcode ftpList(CURL * curl)
  242. {
  243. curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, ftpWriteList);
  244. curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)parseListLine);
  245. curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
  246. curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
  247. curl_easy_setopt(curl, CURLOPT_FTPPORT, "localhost:1234");
  248. return curl_easy_perform(curl);
  249. }
  250. unsigned int timeout(void * lib_data, int sock, unsigned char * data)
  251. {
  252. CURLcode res;
  253. char path[1024];
  254. (void)lib_data;
  255. (void)sock;
  256. (void)data;
  257. feederLog = feederLog_temp;//todo:proc neudrzi feederLog promennou
  258. if (curl_g)
  259. {
  260. strcpy(path, "ftp://fiedler:Yeen0iFa@localhost/DATA/");
  261. curl_easy_setopt(curl_g, CURLOPT_URL, path);
  262. res = ftpList(curl_g);
  263. if (import_list == NULL)
  264. feederLog(LOG_DEBUG, "fiedler: Nothing to import\n");
  265. res = ftpImport(curl_g, path);
  266. res = ftpMarkImported(curl_g);
  267. }
  268. (void)res;
  269. return 0;
  270. }
  271. unsigned int
  272. process(void *lib_data, int sock, unsigned char *data,
  273. unsigned int length, unsigned long long int *id, time_t * tm,
  274. double *result_array, uint64_t * sensors, unsigned int *type)
  275. {
  276. (void)lib_data;
  277. (void)sock;
  278. (void)data;
  279. (void)length;
  280. struct measurement_t * measurement;
  281. if (measurement_list == NULL)
  282. return 0;
  283. measurement = measurement_list;
  284. result_array[0] = measurement->value;
  285. sensors[0] = measurement->sensor_id;
  286. *id = measurement->unit_id;
  287. *tm = measurement->time;
  288. *type = VALUES_TYPE_OBS;
  289. measurement_list = measurement->next;
  290. free(measurement);
  291. return 1;
  292. }
  293. int curlDebug(CURL * curl, curl_infotype info, char * string, size_t size, void * data)
  294. {
  295. (void)curl;
  296. (void)data;
  297. if (feederLog != NULL)
  298. {
  299. if (info == CURLINFO_TEXT)
  300. {
  301. string[size] = 0;
  302. feederLog(LOG_DEBUG, "fiedler: %s", string);
  303. }
  304. }
  305. return 0;
  306. }
  307. int init(void *param)
  308. {
  309. feederLog = NULL;
  310. param = param;
  311. curl_global_init(CURL_GLOBAL_DEFAULT);
  312. printf("Curl version: %s \n", curl_version());
  313. curl_g = curl_easy_init();
  314. curl_easy_setopt(curl_g, CURLOPT_DEBUGFUNCTION, curlDebug);
  315. import_list = 0;
  316. measurement_list = 0;
  317. return 0;
  318. }
  319. /*
  320. to test only:
  321. int main(void)
  322. {
  323. unsigned long long int id;
  324. time_t tp;
  325. double values[64];
  326. uint64_t sensors[64];
  327. unsigned int data_type;
  328. // timeout(NULL, 0, NULL);
  329. while (process(NULL, 0, NULL, 0, &id, &tp, values, sensors, &data_type) > 0)
  330. {
  331. printf("Zaznam: id %lld, %lld, %f\n", id, sensors[0], values[0]);
  332. }
  333. return 0;
  334. }
  335. */