#include #include #include #include #include #include #include #include "../../status.h" #include "../../feeder.h" static unsigned long int packet; void (* feederLog)(int priority, const char * fmt, ...); int setLog(void * func) { feederLog = func; return 0; } void zerr(int ret) { feederLog(LOG_WARNING, "decompression: "); switch (ret) { case Z_ERRNO: if (ferror(stdin)) feederLog(LOG_WARNING, "error reading stdin\n"); if (ferror(stdout)) feederLog(LOG_WARNING, "error writing stdout\n"); break; case Z_STREAM_ERROR: feederLog(LOG_WARNING, "invalid compression level\n"); break; case Z_DATA_ERROR: feederLog(LOG_WARNING, "invalid or incomplete deflate data\n"); break; case Z_MEM_ERROR: feederLog(LOG_WARNING, "out of memory\n"); break; case Z_BUF_ERROR: feederLog(LOG_WARNING, "buffer error\n"); break; case Z_VERSION_ERROR: feederLog(LOG_WARNING, "zlib version mismatch!\n"); default: feederLog(LOG_WARNING, "no error\n"); break; } } int init(void * param) { return 0; } unsigned long long int dataCheck(char * data, time_t *timestamp, double *value, uint64_t *sensor) { unsigned long long int id; struct tm t; char sensor_str[64]; sscanf(data, "%lld,%ld,%04d-%02d-%02d %02d:%02d:%02d,%lf,%s", &id, &packet, &t.tm_year, &t.tm_mon, &t.tm_mday, &t.tm_hour, &t.tm_min, &t.tm_sec, value, sensor_str); t.tm_year -= 1900; t.tm_mon -= 1; *timestamp = mktime(&t); *sensor = atoll(sensor_str); feederLog(LOG_DEBUG, "Data from %ld, packet number %ld, sensor %s, value %f on %04d-%02d-%02d %02d:%02d:%02d\n", id, packet, sensor_str, *value, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec); return id; } #define HEADER_SIZE 5 #define DATA_SIZE 2048 unsigned int process(void *lib_data, int socket, unsigned char *data, unsigned int length, unsigned long long int *id, time_t * tm, double *result_array, uint64_t * sensors, unsigned int *type) { int raw; int compressed = 0; int dres; unsigned long dlength; static char *ddata = NULL; static char *message = NULL; char *c; unsigned int ret = 0; feederLog(LOG_DEBUG, "senso: Analazing data\n"); if (data != NULL) { if ((data[0] != ':') && (data[0] != ';')) raw = 1; else raw = 0; if (!raw) { switch (data[0]) { case ':':compressed = 1; break; case ';':compressed = 0; break; } dlength = ((unsigned long)data[1]) << 24; dlength += ((unsigned long)data[2]) << 16; dlength += ((unsigned long)data[3]) << 8; dlength += ((unsigned long)data[4]); ddata = malloc(dlength + 1); if (ddata == NULL) { feederLog(LOG_WARNING, "senso: Can not allocate decompress buffer\n"); return 0; } feederLog(LOG_DEBUG, "senso: Packet length %ld, data length %ld%s\n", length, dlength, compressed ? ", compressed" : ""); length -= HEADER_SIZE; if (ddata != NULL) { if (compressed) { dres = uncompress((unsigned char *)ddata, (uLongf *)&dlength, data + HEADER_SIZE, length); } else { memcpy(ddata, data + HEADER_SIZE, length); dres = Z_OK; } if (dres == Z_OK) { ddata[dlength] = 0; feederLog(LOG_DEBUG, "senso: Data, cr: %f%% : \n%s", ((double)length / (double)dlength) * 100.0, ddata); message = ddata; } else { zerr(dres); } } } else { ddata = malloc(length + 1); if (ddata == NULL) { feederLog(LOG_WARNING, "Can not allocate data buffer\n"); return 0; } memcpy(ddata, data, length); ddata[length] = 0; feederLog(LOG_DEBUG, "Data raw: %s\n", data); message = ddata; } } if (message != NULL) { c = strchr(message, '\n'); *c = 0; *id = dataCheck(message, tm, &result_array[0], (unsigned long int *)&sensors[0]); message = c + 1; ret = 1; if (strlen(message) == 0) { free(ddata); message = ddata = NULL; } } *type = VALUES_TYPE_OBS; return ret; } int reply(void *lib_data, int socket, unsigned char *data) { feederLog(LOG_DEBUG, "senso: replying\n"); sprintf((char *)data, "OK%ld", packet); feederLog(LOG_DEBUG, "senso: Acquiring with %s\n", data); return strlen((char *)data); }