All checks were successful
dev test / test (push) Successful in 17s
dev test / vulnCheck (push) Successful in 30s
dev test / Ci-Lint (push) Successful in 19s
${{ github.actor }} executed Build Push Prod / build (push) Successful in 4m24s
${{ github.actor }} executed Build Push Prod / deploy (push) Successful in 18s
454 lines
12 KiB
Go
454 lines
12 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
_ "embed"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"git.maximotejeda.com/maximo/dolar/internal/adapter/nats"
|
|
"git.maximotejeda.com/maximo/dolar/internal/application/core/domain"
|
|
"git.maximotejeda.com/maximo/dolar/internal/ports"
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
//go:embed schema.sql
|
|
var schema string
|
|
|
|
type History struct {
|
|
ID int64 `json:"id"`
|
|
NameID int64 `json:"name_id"`
|
|
Compra float64 `json:"compra"`
|
|
Venta float64 `json:"venta"`
|
|
Parser string `json:"parser"`
|
|
Parsed int64 `json:"parsed"`
|
|
}
|
|
|
|
type Institution struct {
|
|
ID int64 `json:"id"`
|
|
Name string `json:"name"`
|
|
ShortName string `json:"short_name"`
|
|
Created int64 `json:"created"`
|
|
}
|
|
|
|
type Adapter struct {
|
|
db *sql.DB
|
|
log *slog.Logger
|
|
nats ports.NATSPort
|
|
}
|
|
|
|
func NewAdapter(dataSourceURL string, natsConn ports.NATSPort) (*Adapter, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
log := slog.Default().With("adapter", "db")
|
|
pragmas := "cache=shared&_foreign_keys=on&_busy_timeout=3000&_journal_mode=WAL"
|
|
db, err := sql.Open("sqlite", fmt.Sprintf("file:%s?%s", dataSourceURL, pragmas))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecion error: %w", err)
|
|
}
|
|
err = db.PingContext(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("ping error: %w", err)
|
|
}
|
|
db.SetConnMaxIdleTime(10 * time.Second)
|
|
CreateTables(db)
|
|
// TODO create tables and trigers on first run
|
|
return &Adapter{db: db, log: log, nats: natsConn}, nil
|
|
}
|
|
|
|
// Schema
|
|
func CreateTables(db *sql.DB) {
|
|
_, err := db.Exec(schema)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// Save
|
|
// TODO
|
|
func (a *Adapter) Save(history *domain.History) error {
|
|
if a.db == nil {
|
|
return fmt.Errorf("nil or empty database")
|
|
}
|
|
if history == nil {
|
|
return fmt.Errorf("nil struct passed [%v]", history)
|
|
}
|
|
inst, err := a.GETInstitution(history.Institution.Name)
|
|
if err != nil {
|
|
a.log.Error("geting inst ", "error", err)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
change := domain.NewChange(domain.History{}, *history)
|
|
message := domain.NewMessage("change adding institution", change, err)
|
|
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
|
|
err = a.nats.NewInstRegistered(message)
|
|
if err != nil {
|
|
a.log.Error(err.Error())
|
|
}
|
|
}
|
|
id, _ := a.ADDInstitution(history.Institution.Name, history.Institution.ShortName)
|
|
inst = &Institution{ID: id}
|
|
a.log.Info("Adding institution", "inst", inst)
|
|
}
|
|
}
|
|
// TODO: look for institution inside db
|
|
// If inst is alredy in db pass ID to History
|
|
// Else Create a new institution
|
|
hist := History{
|
|
NameID: inst.ID,
|
|
Compra: history.Compra,
|
|
Venta: history.Venta,
|
|
Parser: history.Parser,
|
|
Parsed: history.Parsed,
|
|
}
|
|
|
|
lH, err := a.GetLatest(history.Institution.Name)
|
|
|
|
if errors.Is(err, sql.ErrNoRows ) {
|
|
a.log.Info("adding new item to table: ", "parse", history.Parser, "name", history.Institution.Name)
|
|
change := domain.NewChange(domain.History{}, *history)
|
|
message := domain.NewMessage("change adding institution", change, err)
|
|
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
|
|
err = a.nats.ChangeRegistered(message)
|
|
if err != nil {
|
|
a.log.Error(err.Error())
|
|
}
|
|
}
|
|
return a.AddNew(hist, inst.ID)
|
|
}
|
|
|
|
if history.Compra == lH.Compra && history.Venta == lH.Venta {
|
|
return nil
|
|
}else if history.Compra == 0 || history.Venta == 0 {
|
|
return nil
|
|
}else {
|
|
// if one of them changes create a new row
|
|
a.log.Info("change registered, adding item", "parse", history.Parser, "name", history.Institution.Name, "compra enter", history.Compra, "compra db", hist.Compra, "venta enter", history.Venta, "venta db", hist.Venta)
|
|
ins, err := a.GETInstitution(history.Institution.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
change := domain.NewChange(*lH, *history)
|
|
message := domain.NewMessage("change registered", change, nil)
|
|
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
|
|
a.log.Info("calling nats", "struct", ok)
|
|
err = a.nats.ChangeRegistered(message)
|
|
if err != nil {
|
|
a.log.Error("sending change to nats", "error", err)
|
|
}
|
|
}
|
|
|
|
return a.AddNew(hist, int64(ins.ID))
|
|
}
|
|
}
|
|
|
|
// GetLatest
|
|
// TODO
|
|
func (a *Adapter) GetLatest(name string) (*domain.History, error) {
|
|
inst := Institution{}
|
|
|
|
// TODO check inst in db and get latest price
|
|
hist := &History{}
|
|
stmtt, err := a.db.Prepare(`
|
|
SELECT i.id, i.name, i.short_name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h
|
|
JOIN institutions as i ON h.name_id = i.id
|
|
JOIN actual_price as ac ON i.id = ac.inst_id AND h.id = ac.hist_id
|
|
WHERE i.name = ? AND ac.hist_id = h.id;`)
|
|
if err != nil {
|
|
a.log.Error("preparing stmtt", "error", err.Error())
|
|
return nil, err
|
|
}
|
|
defer stmtt.Close()
|
|
|
|
if err := stmtt.QueryRow(name).Scan(&hist.ID, &inst.Name, &inst.ShortName, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil {
|
|
a.log.Error("getting latest", "error", err.Error(), "parser", hist.Parser, "name", name)
|
|
return nil, err
|
|
}
|
|
dHist := &domain.History{
|
|
ID: hist.ID,
|
|
Institution: domain.Institution{Name: inst.Name, ShortName: inst.ShortName},
|
|
Compra: hist.Compra,
|
|
Venta: hist.Venta,
|
|
Parser: hist.Parser,
|
|
Parsed: hist.Parsed,
|
|
}
|
|
|
|
return dHist, nil
|
|
|
|
}
|
|
|
|
// GetSince
|
|
// TODO
|
|
func (a *Adapter) GetSince(name string, duration int64) (hists []*domain.History, err error) {
|
|
tDuration := time.Now().Add(-time.Minute * time.Duration(duration)).Unix()
|
|
a.log.Info("getsince", "name", name, "duration", duration, "unixDuration", tDuration)
|
|
stmt, err := a.db.Prepare("SELECT h.id, i.name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h JOIN institutions as i ON h.name_id = i.id WHERE i.name = ? AND h.parsed > ? ORDER BY parsed DESC;")
|
|
if err != nil {
|
|
a.log.Error("[GetChangeSince] preparing", "error", err.Error())
|
|
return nil, err
|
|
}
|
|
defer stmt.Close()
|
|
rows, err := stmt.Query(name, tDuration)
|
|
if err != nil {
|
|
a.log.Error("[GetChangeSince] preparing", "error", err.Error())
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
inst := Institution{}
|
|
hist := History{}
|
|
if err := rows.Scan(&hist.ID, &inst.Name, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil {
|
|
a.log.Error("[GetChangeSince] scanning", "error", err)
|
|
return nil, err
|
|
}
|
|
|
|
hists = append(hists,
|
|
&domain.History{
|
|
ID: hist.ID,
|
|
Institution: domain.Institution{
|
|
Name: inst.Name,
|
|
},
|
|
Compra: hist.Compra,
|
|
Venta: hist.Venta,
|
|
Parser: hist.Parser,
|
|
Parsed: hist.Parsed,
|
|
})
|
|
}
|
|
return hists, nil
|
|
|
|
}
|
|
|
|
// GetInstByType
|
|
// Get institutions names if bancos, cajas or agentes is passed
|
|
func (a *Adapter) GetInstByType(name string) ([]string, error) {
|
|
var (
|
|
stmt *sql.Stmt
|
|
err error
|
|
)
|
|
switch name {
|
|
case "bancos":
|
|
stmt, err = a.db.Prepare(`
|
|
SELECT i.name
|
|
FROM institutions AS i
|
|
JOIN histories AS h ON i.id = h.name_id
|
|
WHERE (i.name LIKE '%ban%' OR i.name LIKE '%scoti%') AND h.name_id IS NOT NULL
|
|
`)
|
|
|
|
case "cajas":
|
|
stmt, err = a.db.Prepare(`
|
|
SELECT i.name
|
|
FROM institutions AS i
|
|
JOIN histories AS h ON i.id = h.name_id
|
|
WHERE i.name LIKE '%asociacion%' AND h.name_id IS NOT NULL`)
|
|
case "agentes":
|
|
stmt, err = a.db.Prepare(`
|
|
SELECT i.name FROM
|
|
institutions AS i
|
|
JOIN histories AS h ON i.id = h.name_id
|
|
WHERE i.name NOT LIKE '%ban%' AND i.name NOT LIKE '%scoti%' AND i.name NOT LIKE '%asociacion%'`)
|
|
default:
|
|
err = fmt.Errorf("name not recognized")
|
|
}
|
|
if err != nil {
|
|
a.log.Error("[inst-GetAll]", "error", err)
|
|
return nil, err
|
|
}
|
|
defer stmt.Close()
|
|
rows, err := stmt.Query()
|
|
if err != nil {
|
|
a.log.Error("[inst-GetAll-stmt]", "error", err)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
insts := []string{}
|
|
for rows.Next() {
|
|
inst := ""
|
|
if err = rows.Scan(&inst); err != nil {
|
|
return nil, err
|
|
}
|
|
if inst == "" {
|
|
continue
|
|
}
|
|
insts = append(insts, inst)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return insts, err
|
|
}
|
|
return insts, nil
|
|
}
|
|
|
|
func (a *Adapter) ADDInstitution(name, short string) (id int64, err error) {
|
|
stmt, err := a.db.Prepare("INSERT INTO institutions (name, short_name, created) VALUES(?,?,?);")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer stmt.Close()
|
|
parsed := time.Now().Format(time.DateTime)
|
|
res, err := stmt.Exec(&name, short, &parsed)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
id, err = res.LastInsertId()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return id, nil
|
|
|
|
}
|
|
|
|
// GETInstitutionByID
|
|
func (a *Adapter) GETInstitutionByIDS(ids string) (instList []*Institution, err error) {
|
|
|
|
stmtt, err := a.db.Prepare(fmt.Sprintf("SELECT id, name, short_name FROM institutions WHERE id IN (%s)", ids))
|
|
if err != nil {
|
|
a.log.Error("preparing stmt", "error", err.Error())
|
|
return nil, err
|
|
}
|
|
defer stmtt.Close()
|
|
|
|
rows, err := stmtt.Query()
|
|
if err != nil {
|
|
a.log.Error("getting institution", "error", err.Error(), "id", ids)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
insti := Institution{}
|
|
if err = rows.Scan(&insti.ID, &insti.Name, &insti.ShortName); err != nil {
|
|
a.log.Error("[getting institution scanning]", "error", err.Error(), "id", ids)
|
|
continue
|
|
}
|
|
instList = append(instList, &insti)
|
|
}
|
|
return instList, nil
|
|
}
|
|
|
|
// GETInstitution
|
|
func (a *Adapter) GETInstitution(name string) (inst *Institution, err error) {
|
|
inst = &Institution{}
|
|
stmtt, err := a.db.Prepare("SELECT i.id, i.name, i.short_name FROM institutions AS i WHERE i.name = ?")
|
|
if err != nil {
|
|
a.log.Error("preparing stmt", "error", err.Error())
|
|
return nil, err
|
|
}
|
|
defer stmtt.Close()
|
|
if err := stmtt.QueryRow(name).Scan(&inst.ID, &inst.Name, &inst.ShortName); err != nil {
|
|
a.log.Error("getting institution", "error", err.Error(), "short name", inst.ShortName, "name", name)
|
|
return nil, err
|
|
}
|
|
return inst, nil
|
|
}
|
|
|
|
// AddNew
|
|
// Add a new row in the dolar table
|
|
// Will send to nats changes on prices
|
|
func (a *Adapter) AddNew(row History, id int64) error {
|
|
stmt, err := a.db.Prepare("INSERT INTO histories (name_id, compra, venta, parser, parsed) VALUES(?,?,?,?,?);")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
_, err = stmt.Exec(&id, &row.Compra, &row.Venta, &row.Parser, time.Now().Unix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a Adapter) GetInstByName(name string) (*domain.Institution, error) {
|
|
inst, err := a.GETInstitution(name)
|
|
insti := &domain.Institution{
|
|
ID: inst.ID,
|
|
Name: inst.Name,
|
|
ShortName: inst.ShortName,
|
|
Created: inst.Created,
|
|
}
|
|
return insti, err
|
|
}
|
|
|
|
func (a Adapter) TGBSubscribe(tgb_id int64, inst_name string) (bool, error) {
|
|
stmt, err := a.db.Prepare(`
|
|
INSERT INTO subscriptions(tgb_id, inst_id, created)
|
|
VALUES(?,(SELECT id FROM institutions WHERE name = ?),strftime('%s', 'now'));
|
|
`)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer stmt.Close()
|
|
_, err = stmt.Exec(tgb_id, inst_name)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
func (a Adapter) TGBUnsubscribe(tgb_id int64, inst_name string) (bool, error) {
|
|
stmt, err := a.db.Prepare(`
|
|
DELETE FROM subscriptions
|
|
WHERE tgb_id = ? AND inst_id = (SELECT id FROM institutions WHERE name = ?)
|
|
`)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer stmt.Close()
|
|
_, err = stmt.Exec(tgb_id, inst_name)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
func (a Adapter) TGBGetSubscribedUsers(inst_name string) ([]int64, error) {
|
|
stmt, err := a.db.Prepare(`
|
|
SELECT s.tgb_id FROM subscriptions AS s
|
|
JOIN institutions AS i ON s.inst_id = i.id
|
|
WHERE i.name = ?;
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer stmt.Close()
|
|
rows, err := stmt.Query(inst_name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
tgbList := []int64{}
|
|
for rows.Next() {
|
|
var tgb int64
|
|
err := rows.Scan(&tgb)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tgbList = append(tgbList, tgb)
|
|
}
|
|
return tgbList, nil
|
|
}
|
|
func (a Adapter) TGBGetSubscribedInst(tgb_id int64) ([]string, error) {
|
|
stmt, err := a.db.Prepare(`
|
|
SELECT i.name FROM subscriptions AS s
|
|
JOIN institutions AS i ON s.inst_id = i.id
|
|
WHERE s.tgb_id = ?;
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer stmt.Close()
|
|
rows, err := stmt.Query(tgb_id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
instList := []string{}
|
|
for rows.Next() {
|
|
var inst string
|
|
err := rows.Scan(&inst)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
instList = append(instList, inst)
|
|
}
|
|
return instList, nil
|
|
}
|