#!/usr/bin/perl # parser pentru mesajele MQTT # Version: 1.2.0 use strict; use warnings; use JSON qw( decode_json ); use DBI; use Proc::Daemon; use POSIX qw(strftime); use System::Timeout qw(system); # pentru debug pun daemon=0 ca sa nu mai ruleze ca daemon my $daemon=1; my $continue; if ($daemon==1){ Proc::Daemon::Init; $continue = 1; $SIG{TERM} = sub { $continue = 0 }; open PID, ">>/var/run/mqttparser.pid" or die "Couldn't open file: $!"; print PID "$$\n"; close PID; } # citesc fisierul de config my $config = '/etc/mqttparser.conf'; open my $CFG, $config or die "Could not open my $config: my $!"; my $db; my $dbuser; my $dbpass; my $dbhost; my $mqttuser; my $mqttpass; my $startime = strftime("%s", localtime(time)); my $row=1; while( my $line = <$CFG>){ my @credentials = split /=/, $line; if ($row==1) {$db = $credentials[1]; chomp($db);} if ($row==2) {$dbuser = $credentials[1]; chomp($dbuser);} if ($row==3) {$dbpass = $credentials[1]; chomp($dbpass);} if ($row==4) {$dbhost = $credentials[1]; chomp($dbhost);} if ($row==5) {$mqttuser = $credentials[1]; chomp($mqttuser);} if ($row==6) {$mqttpass = $credentials[1]; chomp($mqttpass);} $row++; } close $CFG; my $dbh = DBI->connect("DBI:mysql:$db:$dbhost", $dbuser, $dbpass); open(my $SUB, "/usr/bin/mosquitto_sub -u $mqttuser -P $mqttpass -v -t \"#\" |"); while (my $mqttMessage = <$SUB> ) { my $start = substr($mqttMessage, 0, 4); # prelucrez doar mesajele de pe canalele stat/# sau tele/# if (($start eq "stat")||($start eq "tele")){ if (index($mqttMessage, "{") == -1) { #print "Mesajul nu contine json, il sparg dupa spatiu\n" (my $channel, my $nonjson) = split /\ /, $mqttMessage, 2; chomp($nonjson); (my $root, my $topic, my $queue) = split /\//, $channel; # aleg canalele de unde salvez date in DB: if ($queue eq "LWT"){ my $curtime = strftime("%s", localtime(time)); my $uptime = $curtime-$startime; # nu procesez retained LWT cand pornesc mqttparser if ($uptime>20){ # stergem consumul pentru azi si ieri cand este offline priza if ($nonjson eq 'Offline') { delDB($topic); } # salvam ultima stare a releului my $script="php /admin/scripts/mqttparser/last_state.php $topic $nonjson\n"; runSCRIPT ($script); # salvam in DB statusul online/offline addDB ($topic, "LWT", $nonjson); } } else { #print "nu ma intereseaza alte canale fara json"; } } else { #print "Mesajul contine json, il sparg dupa {\n"; my $channel = substr($mqttMessage, 0, index("$mqttMessage", '{')); $channel=~ s/\ //g; # sterg ultimul spatiu (my $root, my $topic, my $queue) = split /\//, $channel; my $json = substr($mqttMessage, index("$mqttMessage", '{'), rindex("$mqttMessage", '}')); # aleg canalele de unde salvez date in DB: if ($queue eq "ENERGY"){ my $decoded = decode_json($json); if (exists $decoded->{'Yesterday'}){ addDB ($topic, "Yesterday", $decoded->{'Yesterday'}); } if (exists $decoded->{'Today'}){ addDB ($topic, "Today", $decoded->{'Today'}); } } if ($queue eq "SENSOR"){ # topic nou sonoff v5.12.0 my $decoded = decode_json($json); if (exists $decoded->{'ENERGY'}{'Yesterday'}){ addDB ($topic, "Yesterday", $decoded->{'ENERGY'}{'Yesterday'}); } if (exists $decoded->{'ENERGY'}{'Today'}){ addDB ($topic, "Today", $decoded->{'ENERGY'}{'Today'}); } } if ($queue eq "RESULT"){ # topic nou sonoff v6.2.1 sonoff bridge my $decoded = decode_json($json); if (exists $decoded->{'RfReceived'}{'Data'}){ addRF ($topic, $decoded->{'RfReceived'}{'Sync'}, $decoded->{'RfReceived'}{'Low'}, $decoded->{'RfReceived'}{'High'}, $decoded->{'RfReceived'}{'Data'}); my $script="php /admin/scripts/mqttparser/rf433.php $topic $decoded->{'RfReceived'}{'Data'}"; runSCRIPT ($script); } } if ($queue eq "ESP"){ my $decoded = decode_json($json); if (exists $decoded->{'Temp'}){ addFILE ($topic, "Temp", $decoded->{'Temp'}); } if (exists $decoded->{'Humi'}){ addFILE ($topic, "Humi", $decoded->{'Humi'}); } if (exists $decoded->{'Module'}){ addDB ($topic, "Module", $decoded->{'Module'}); } if (exists $decoded->{'Version'}){ addDB ($topic, "Version", $decoded->{'Version'}); } if (exists $decoded->{'ESPMac'}){ addDB ($topic, "ESPMac", $decoded->{'ESPMac'}); } if (exists $decoded->{'IPAddress'}){ addDB ($topic, "IPAddress", $decoded->{'IPAddress'}); } if (exists $decoded->{'LoopInterval'}){ addDB ($topic, "LoopInterval", $decoded->{'LoopInterval'}); } } if (index($queue, "INFO") != -1) { my $decoded = decode_json($json); if (exists $decoded->{'Module'}){ addDB ($topic, "Module", $decoded->{'Module'}); } if (exists $decoded->{'Version'}){ addDB ($topic, "Version", $decoded->{'Version'}); } if (exists $decoded->{'IPAddress'}){ addDB ($topic, "IPAddress", $decoded->{'IPAddress'}); } if (exists $decoded->{'RestartReason'}){ addDB ($topic, "RestartReason", $decoded->{'RestartReason'}); } } if ($queue eq "STATE"){ my $decoded = decode_json($json); if (exists $decoded->{'Uptime'}) { addDB ($topic, "Uptime", $decoded->{'Uptime'}); } } if ($queue eq "ALARM"){ $json=~ s/\"/\\"/g; my $script="php /admin/scripts/mqttparser/alarm.php $topic $json"; runSCRIPT ($script); } } } else { #print 'Nu ma intereseaza mesajele astea!\n$mqttMessage'; } } if ($daemon==1){ while ($continue) { } } # rulez scripturi cand primesc input specific via mqtt sub runSCRIPT { (my $script) = @_; my $logfile='/var/log/mqttparser/scripts.log'; my $timealarm = strftime("%d/%m/%Y %H:%M:%S", localtime(time)); open (my $fh, '>>', $logfile) or die "Cannot open $logfile"; print $fh "$timealarm - Am lansat: $script"; close $fh; system('1', "$script"); # lansez scriptul return; } # adaug in fisierele pentru graficele senzorilor sub addFILE { (my $topic, my $camp, my $valoare) = @_; my $cale='/var/www/html/www.clickhome.ro/colect/'; my $filename; if ($camp eq "Temp"){ $filename="temperatura.txt"; } if ($camp eq "Humi"){ $filename="umiditate.txt"; } my $folder = $cale.$topic; unless(-e $folder or mkdir $folder) { } my $cale_fisier=$folder."/".$filename; my $newline=""; if (-e $cale_fisier) { $newline=",\n"; } my $month=strftime("%m", localtime(time)); $month--; my $date = strftime("[Date.UTC(%Y,$month,%d,%H,%M,%S)", localtime(time)); my $graph = $newline.$date.", ".$valoare."]"; open (my $fh, '>>', $cale_fisier) or die "Cannot open $cale_fisier"; print $fh "$graph"; close $fh; return; } # adaug/updatez in tabela getdevinfo din baza de date sub addDB { (my $topic, my $camp, my $valoare) = @_; (my $user, my $devid) = split /\-/, $topic; my $query = "insert into getdevinfo values (?, ?, ?, ?) on duplicate key update camp=?, valoare=?"; my $sth = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n"; $sth->execute("$user", "$devid", "$camp", "$valoare", "$camp", "$valoare"); $sth->finish(); return; } # sterg din tabela getdevinfo din baza de date sub delDB { (my $topic) = @_; (my $user, my $devid) = split /\-/, $topic; my $query = "DELETE FROM getdevinfo WHERE user=? and devid=? and (camp='Yesterday' || camp='Today')"; my $sth = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n"; $sth->execute("$user", "$devid"); $sth->finish(); return; } # adaug/updatez in tabela get433info din baza de date sub addRF { (my $topic, my $sync, my $low, my $high, my $data) = @_; (my $user, my $devid) = split /\-/, $topic; my $query = "insert into get433info values (?, ?, ?, ?, ?, ?) on duplicate key update data=?"; my $sth = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n"; $sth->execute("$user", "$devid", "$sync", "$low", "$high", "$data", "$data"); $sth->finish(); return; } $dbh->disconnect;