maximo tejeda 4b60ebc7a7
All checks were successful
dev test / test (push) Successful in 17s
dev test / vulnCheck (push) Successful in 30s
dev test / Ci-Lint (push) Successful in 19s
${{ github.actor }} executed Build Push Prod / build (push) Successful in 4m24s
${{ github.actor }} executed Build Push Prod / deploy (push) Successful in 18s
FIRST commit
2024-12-02 16:07:48 -04:00

454 lines
12 KiB
Go

package db
import (
"context"
"database/sql"
_ "embed"
"errors"
"fmt"
"log/slog"
"time"
"git.maximotejeda.com/maximo/dolar/internal/adapter/nats"
"git.maximotejeda.com/maximo/dolar/internal/application/core/domain"
"git.maximotejeda.com/maximo/dolar/internal/ports"
_ "modernc.org/sqlite"
)
//go:embed schema.sql
var schema string
type History struct {
ID int64 `json:"id"`
NameID int64 `json:"name_id"`
Compra float64 `json:"compra"`
Venta float64 `json:"venta"`
Parser string `json:"parser"`
Parsed int64 `json:"parsed"`
}
type Institution struct {
ID int64 `json:"id"`
Name string `json:"name"`
ShortName string `json:"short_name"`
Created int64 `json:"created"`
}
type Adapter struct {
db *sql.DB
log *slog.Logger
nats ports.NATSPort
}
func NewAdapter(dataSourceURL string, natsConn ports.NATSPort) (*Adapter, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
log := slog.Default().With("adapter", "db")
pragmas := "cache=shared&_foreign_keys=on&_busy_timeout=3000&_journal_mode=WAL"
db, err := sql.Open("sqlite", fmt.Sprintf("file:%s?%s", dataSourceURL, pragmas))
if err != nil {
return nil, fmt.Errorf("connecion error: %w", err)
}
err = db.PingContext(ctx)
if err != nil {
return nil, fmt.Errorf("ping error: %w", err)
}
db.SetConnMaxIdleTime(10 * time.Second)
CreateTables(db)
// TODO create tables and trigers on first run
return &Adapter{db: db, log: log, nats: natsConn}, nil
}
// Schema
func CreateTables(db *sql.DB) {
_, err := db.Exec(schema)
if err != nil {
panic(err)
}
}
// Save
// TODO
func (a *Adapter) Save(history *domain.History) error {
if a.db == nil {
return fmt.Errorf("nil or empty database")
}
if history == nil {
return fmt.Errorf("nil struct passed [%v]", history)
}
inst, err := a.GETInstitution(history.Institution.Name)
if err != nil {
a.log.Error("geting inst ", "error", err)
if errors.Is(err, sql.ErrNoRows) {
change := domain.NewChange(domain.History{}, *history)
message := domain.NewMessage("change adding institution", change, err)
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
err = a.nats.NewInstRegistered(message)
if err != nil {
a.log.Error(err.Error())
}
}
id, _ := a.ADDInstitution(history.Institution.Name, history.Institution.ShortName)
inst = &Institution{ID: id}
a.log.Info("Adding institution", "inst", inst)
}
}
// TODO: look for institution inside db
// If inst is alredy in db pass ID to History
// Else Create a new institution
hist := History{
NameID: inst.ID,
Compra: history.Compra,
Venta: history.Venta,
Parser: history.Parser,
Parsed: history.Parsed,
}
lH, err := a.GetLatest(history.Institution.Name)
if errors.Is(err, sql.ErrNoRows ) {
a.log.Info("adding new item to table: ", "parse", history.Parser, "name", history.Institution.Name)
change := domain.NewChange(domain.History{}, *history)
message := domain.NewMessage("change adding institution", change, err)
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
err = a.nats.ChangeRegistered(message)
if err != nil {
a.log.Error(err.Error())
}
}
return a.AddNew(hist, inst.ID)
}
if history.Compra == lH.Compra && history.Venta == lH.Venta {
return nil
}else if history.Compra == 0 || history.Venta == 0 {
return nil
}else {
// if one of them changes create a new row
a.log.Info("change registered, adding item", "parse", history.Parser, "name", history.Institution.Name, "compra enter", history.Compra, "compra db", hist.Compra, "venta enter", history.Venta, "venta db", hist.Venta)
ins, err := a.GETInstitution(history.Institution.Name)
if err != nil {
return err
}
change := domain.NewChange(*lH, *history)
message := domain.NewMessage("change registered", change, nil)
if v, ok := a.nats.(*nats.Adapter); ok && v != nil {
a.log.Info("calling nats", "struct", ok)
err = a.nats.ChangeRegistered(message)
if err != nil {
a.log.Error("sending change to nats", "error", err)
}
}
return a.AddNew(hist, int64(ins.ID))
}
}
// GetLatest
// TODO
func (a *Adapter) GetLatest(name string) (*domain.History, error) {
inst := Institution{}
// TODO check inst in db and get latest price
hist := &History{}
stmtt, err := a.db.Prepare(`
SELECT i.id, i.name, i.short_name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h
JOIN institutions as i ON h.name_id = i.id
JOIN actual_price as ac ON i.id = ac.inst_id AND h.id = ac.hist_id
WHERE i.name = ? AND ac.hist_id = h.id;`)
if err != nil {
a.log.Error("preparing stmtt", "error", err.Error())
return nil, err
}
defer stmtt.Close()
if err := stmtt.QueryRow(name).Scan(&hist.ID, &inst.Name, &inst.ShortName, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil {
a.log.Error("getting latest", "error", err.Error(), "parser", hist.Parser, "name", name)
return nil, err
}
dHist := &domain.History{
ID: hist.ID,
Institution: domain.Institution{Name: inst.Name, ShortName: inst.ShortName},
Compra: hist.Compra,
Venta: hist.Venta,
Parser: hist.Parser,
Parsed: hist.Parsed,
}
return dHist, nil
}
// GetSince
// TODO
func (a *Adapter) GetSince(name string, duration int64) (hists []*domain.History, err error) {
tDuration := time.Now().Add(-time.Minute * time.Duration(duration)).Unix()
a.log.Info("getsince", "name", name, "duration", duration, "unixDuration", tDuration)
stmt, err := a.db.Prepare("SELECT h.id, i.name, h.parser, h.compra, h.venta, h.parsed FROM histories AS h JOIN institutions as i ON h.name_id = i.id WHERE i.name = ? AND h.parsed > ? ORDER BY parsed DESC;")
if err != nil {
a.log.Error("[GetChangeSince] preparing", "error", err.Error())
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(name, tDuration)
if err != nil {
a.log.Error("[GetChangeSince] preparing", "error", err.Error())
return nil, err
}
defer rows.Close()
for rows.Next() {
inst := Institution{}
hist := History{}
if err := rows.Scan(&hist.ID, &inst.Name, &hist.Parser, &hist.Compra, &hist.Venta, &hist.Parsed); err != nil {
a.log.Error("[GetChangeSince] scanning", "error", err)
return nil, err
}
hists = append(hists,
&domain.History{
ID: hist.ID,
Institution: domain.Institution{
Name: inst.Name,
},
Compra: hist.Compra,
Venta: hist.Venta,
Parser: hist.Parser,
Parsed: hist.Parsed,
})
}
return hists, nil
}
// GetInstByType
// Get institutions names if bancos, cajas or agentes is passed
func (a *Adapter) GetInstByType(name string) ([]string, error) {
var (
stmt *sql.Stmt
err error
)
switch name {
case "bancos":
stmt, err = a.db.Prepare(`
SELECT i.name
FROM institutions AS i
JOIN histories AS h ON i.id = h.name_id
WHERE (i.name LIKE '%ban%' OR i.name LIKE '%scoti%') AND h.name_id IS NOT NULL
`)
case "cajas":
stmt, err = a.db.Prepare(`
SELECT i.name
FROM institutions AS i
JOIN histories AS h ON i.id = h.name_id
WHERE i.name LIKE '%asociacion%' AND h.name_id IS NOT NULL`)
case "agentes":
stmt, err = a.db.Prepare(`
SELECT i.name FROM
institutions AS i
JOIN histories AS h ON i.id = h.name_id
WHERE i.name NOT LIKE '%ban%' AND i.name NOT LIKE '%scoti%' AND i.name NOT LIKE '%asociacion%'`)
default:
err = fmt.Errorf("name not recognized")
}
if err != nil {
a.log.Error("[inst-GetAll]", "error", err)
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query()
if err != nil {
a.log.Error("[inst-GetAll-stmt]", "error", err)
return nil, err
}
defer rows.Close()
insts := []string{}
for rows.Next() {
inst := ""
if err = rows.Scan(&inst); err != nil {
return nil, err
}
if inst == "" {
continue
}
insts = append(insts, inst)
}
if err := rows.Err(); err != nil {
return insts, err
}
return insts, nil
}
func (a *Adapter) ADDInstitution(name, short string) (id int64, err error) {
stmt, err := a.db.Prepare("INSERT INTO institutions (name, short_name, created) VALUES(?,?,?);")
if err != nil {
return 0, err
}
defer stmt.Close()
parsed := time.Now().Format(time.DateTime)
res, err := stmt.Exec(&name, short, &parsed)
if err != nil {
return 0, err
}
id, err = res.LastInsertId()
if err != nil {
return 0, err
}
return id, nil
}
// GETInstitutionByID
func (a *Adapter) GETInstitutionByIDS(ids string) (instList []*Institution, err error) {
stmtt, err := a.db.Prepare(fmt.Sprintf("SELECT id, name, short_name FROM institutions WHERE id IN (%s)", ids))
if err != nil {
a.log.Error("preparing stmt", "error", err.Error())
return nil, err
}
defer stmtt.Close()
rows, err := stmtt.Query()
if err != nil {
a.log.Error("getting institution", "error", err.Error(), "id", ids)
return nil, err
}
defer rows.Close()
for rows.Next() {
insti := Institution{}
if err = rows.Scan(&insti.ID, &insti.Name, &insti.ShortName); err != nil {
a.log.Error("[getting institution scanning]", "error", err.Error(), "id", ids)
continue
}
instList = append(instList, &insti)
}
return instList, nil
}
// GETInstitution
func (a *Adapter) GETInstitution(name string) (inst *Institution, err error) {
inst = &Institution{}
stmtt, err := a.db.Prepare("SELECT i.id, i.name, i.short_name FROM institutions AS i WHERE i.name = ?")
if err != nil {
a.log.Error("preparing stmt", "error", err.Error())
return nil, err
}
defer stmtt.Close()
if err := stmtt.QueryRow(name).Scan(&inst.ID, &inst.Name, &inst.ShortName); err != nil {
a.log.Error("getting institution", "error", err.Error(), "short name", inst.ShortName, "name", name)
return nil, err
}
return inst, nil
}
// AddNew
// Add a new row in the dolar table
// Will send to nats changes on prices
func (a *Adapter) AddNew(row History, id int64) error {
stmt, err := a.db.Prepare("INSERT INTO histories (name_id, compra, venta, parser, parsed) VALUES(?,?,?,?,?);")
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(&id, &row.Compra, &row.Venta, &row.Parser, time.Now().Unix())
if err != nil {
return err
}
return nil
}
func (a Adapter) GetInstByName(name string) (*domain.Institution, error) {
inst, err := a.GETInstitution(name)
insti := &domain.Institution{
ID: inst.ID,
Name: inst.Name,
ShortName: inst.ShortName,
Created: inst.Created,
}
return insti, err
}
func (a Adapter) TGBSubscribe(tgb_id int64, inst_name string) (bool, error) {
stmt, err := a.db.Prepare(`
INSERT INTO subscriptions(tgb_id, inst_id, created)
VALUES(?,(SELECT id FROM institutions WHERE name = ?),strftime('%s', 'now'));
`)
if err != nil {
return false, err
}
defer stmt.Close()
_, err = stmt.Exec(tgb_id, inst_name)
if err != nil {
return false, err
}
return true, nil
}
func (a Adapter) TGBUnsubscribe(tgb_id int64, inst_name string) (bool, error) {
stmt, err := a.db.Prepare(`
DELETE FROM subscriptions
WHERE tgb_id = ? AND inst_id = (SELECT id FROM institutions WHERE name = ?)
`)
if err != nil {
return false, err
}
defer stmt.Close()
_, err = stmt.Exec(tgb_id, inst_name)
if err != nil {
return false, err
}
return true, nil
}
func (a Adapter) TGBGetSubscribedUsers(inst_name string) ([]int64, error) {
stmt, err := a.db.Prepare(`
SELECT s.tgb_id FROM subscriptions AS s
JOIN institutions AS i ON s.inst_id = i.id
WHERE i.name = ?;
`)
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(inst_name)
if err != nil {
return nil, err
}
defer rows.Close()
tgbList := []int64{}
for rows.Next() {
var tgb int64
err := rows.Scan(&tgb)
if err != nil {
return nil, err
}
tgbList = append(tgbList, tgb)
}
return tgbList, nil
}
func (a Adapter) TGBGetSubscribedInst(tgb_id int64) ([]string, error) {
stmt, err := a.db.Prepare(`
SELECT i.name FROM subscriptions AS s
JOIN institutions AS i ON s.inst_id = i.id
WHERE s.tgb_id = ?;
`)
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(tgb_id)
if err != nil {
return nil, err
}
defer rows.Close()
instList := []string{}
for rows.Next() {
var inst string
err := rows.Scan(&inst)
if err != nil {
return nil, err
}
instList = append(instList, inst)
}
return instList, nil
}