#!/usr/bin/perl
# parser pentru mesajele MQTT
# Version: 1.0.6
use strict;
use warnings;
use JSON qw( decode_json );
use DBI;
use Proc::Daemon;
use POSIX qw(strftime);
# pentru debug pun daemon=0 ca sa nu mai ruleze ca daemon
my $daemon=1;
my $continue;
if ($daemon==1){
Proc::Daemon::Init;
$continue = 1;
$SIG{TERM} = sub { $continue = 0 };
open PID, ">>/var/run/mqttparser.pid" or die "Couldn't open file: $!";
print PID "$$\n";
close PID;
}
# citesc fisierul de config
my $config = '/etc/mqttparser.conf';
open my $CFG, $config or die "Could not open my $config: my $!";
my $db;
my $dbuser;
my $dbpass;
my $dbhost;
my $mqttuser;
my $mqttpass;
my $row=1;
while( my $line = <$CFG>){
my @credentials = split /=/, $line;
if ($row==1) {$db = $credentials[1]; chomp($db);}
if ($row==2) {$dbuser = $credentials[1]; chomp($dbuser);}
if ($row==3) {$dbpass = $credentials[1]; chomp($dbpass);}
if ($row==4) {$dbhost = $credentials[1]; chomp($dbhost);}
if ($row==5) {$mqttuser = $credentials[1]; chomp($mqttuser);}
if ($row==6) {$mqttpass = $credentials[1]; chomp($mqttpass);}
$row++;
}
close $CFG;
my $dbh = DBI->connect("DBI:mysql:$db:$dbhost", $dbuser, $dbpass);
open(my $SUB, "/usr/bin/mosquitto_sub -u $mqttuser -P $mqttpass -v -t \"#\" |");
while (my $mqttMessage = <$SUB> ) {
my $start = substr($mqttMessage, 0, 4);
# prelucrez doar mesajele de pe canalele stat/# sau tele/#
if (($start eq "stat")||($start eq "tele")){
if (index($mqttMessage, "{") == -1) {
#print "Mesajul nu contine json, il sparg dupa spatiu\n"
(my $channel, my $nonjson) = split /\ /, $mqttMessage, 2;
chomp($nonjson);
(my $root, my $topic, my $queue) = split /\//, $channel;
# aleg canalele de unde salvez date in DB:
if ($queue eq "LWT"){
#stergem consumul pentru azi si ieri cand este offline priza
if ($nonjson eq 'Offline') {
delDB($topic);
}
addDB ($topic, "LWT", $nonjson);
}
else {
#print "nu ma intereseaza alte canale fara json";
}
}
else {
#print "Mesajul contine json, il sparg dupa {\n";
my $channel = substr($mqttMessage, 0, index("$mqttMessage", '{'));
$channel=~ s/\ //g; # sterg ultimul spatiu
(my $root, my $topic, my $queue) = split /\//, $channel;
my $json = substr($mqttMessage, index("$mqttMessage", '{'), rindex("$mqttMessage", '}'));
# aleg canalele de unde salvez date in DB:
if ($queue eq "ENERGY"){
my $decoded = decode_json($json);
if (exists $decoded->{'Yesterday'}){
addDB ($topic, "Yesterday", $decoded->{'Yesterday'});
}
if (exists $decoded->{'Today'}){
addDB ($topic, "Today", $decoded->{'Today'});
}
}
if ($queue eq "ESP"){
my $decoded = decode_json($json);
if (exists $decoded->{'Temp'}){
addFILE ($topic, "Temp", $decoded->{'Temp'});
}
if (exists $decoded->{'Humi'}){
addFILE ($topic, "Humi", $decoded->{'Humi'});
}
if (exists $decoded->{'Module'}){
addDB ($topic, "Module", $decoded->{'Module'});
}
if (exists $decoded->{'Version'}){
addDB ($topic, "Version", $decoded->{'Version'});
}
if (exists $decoded->{'ESPMac'}){
addDB ($topic, "ESPMac", $decoded->{'ESPMac'});
}
if (exists $decoded->{'IPAddress'}){
addDB ($topic, "IPAddress", $decoded->{'IPAddress'});
}
if (exists $decoded->{'LoopInterval'}){
addDB ($topic, "LoopInterval", $decoded->{'LoopInterval'});
}
}
if (index($queue, "INFO") != -1) {
my $decoded = decode_json($json);
if (exists $decoded->{'Temp'}){
}
if (exists $decoded->{'Module'}){
addDB ($topic, "Module", $decoded->{'Module'});
}
if (exists $decoded->{'Version'}){
addDB ($topic, "Version", $decoded->{'Version'});
}
if (exists $decoded->{'IPAddress'}){
addDB ($topic, "IPAddress", $decoded->{'IPAddress'});
}
if (exists $decoded->{'RestartReason'}){
addDB ($topic, "RestartReason", $decoded->{'RestartReason'});
}
}
}
}
else {
#print 'Nu ma intereseaza mesajele astea!\n$mqttMessage';
}
}
if ($daemon==1){
while ($continue) {
}
}
# adaug in fisierele pentru graficele senzorilor
sub addFILE {
(my $topic, my $camp, my $valoare) = @_;
my $cale='/var/www/html/www.clickhome.ro/colect/';
my $filename;
if ($camp eq "Temp"){
$filename="temperatura.txt";
}
if ($camp eq "Humi"){
$filename="umiditate.txt";
}
my $folder = $cale.$topic;
unless(-e $folder or mkdir $folder) {
}
my $cale_fisier=$folder."/".$filename;
my $newline="";
if (-e $cale_fisier) {
$newline=",\n";
}
my $month=strftime("%m", localtime(time));
$month--;
my $date = strftime("[Date.UTC(%Y,$month,%d,%H,%M,%S)", localtime(time));
my $graph = $newline.$date.", ".$valoare."]";
open (my $fh, '>>', $cale_fisier) or die "Cannot open $cale_fisier";
print $fh "$graph";
close $fh;
return;
}
# adaug/updatez in tabela getdevinfo din baza de date
sub addDB {
(my $topic, my $camp, my $valoare) = @_;
(my $user, my $devid) = split /\-/, $topic;
my $query = "insert into getdevinfo values (?, ?, ?, ?) on duplicate key update camp=?, valoare=?";
my $sth = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n";
$sth->execute("$user", "$devid", "$camp", "$valoare", "$camp", "$valoare");
$sth->finish();
return;
}
# sterg din tabela getdevinfo din baza de date
sub delDB {
(my $topic) = @_;
(my $user, my $devid) = split /\-/, $topic;
my $query = "DELETE FROM getdevinfo WHERE user=? and devid=? and (camp='Yesterday' || camp='Today')";
my $sth = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n";
$sth->execute("$user", "$devid");
$sth->finish();
return;
}
$dbh->disconnect;