with '#' will be ignored, and an empty message aborts the commit. On branch main Initial commit Changes to be committed: new file: .DS_Store new file: .env new file: .gitignore new file: ai-worker/Dockerfile new file: ai-worker/requirements.txt new file: ai-worker/worker.py new file: background-worker/Dockerfile new file: background-worker/go.mod new file: background-worker/go.sum new file: background-worker/main.go new file: background-worker/market.go new file: background-worker/rmv.go new file: background-worker/rss.go new file: background-worker/sql_work.go new file: db/Dockerfile new file: db/init.sql new file: docker-compose.yml new file: server-app/dockerfile new file: server-app/go.mod new file: server-app/go.sum new file: server-app/main.go new file: volumes/.DS_Store new file: volumes/db-init/.DS_Store new file: volumes/db-init/data/news_rss_feeds.csv new file: volumes/web/.DS_Store new file: volumes/web/static/css/blog.css new file: volumes/web/static/css/index-lite.css new file: volumes/web/static/css/index.css new file: volumes/web/static/css/mandelbrot.css new file: volumes/web/static/img/minecraft.png new file: volumes/web/static/js/blog.js new file: volumes/web/static/js/index-lite.js new file: volumes/web/static/js/index.js new file: volumes/web/static/js/mandelbrot.js new file: volumes/web/static/media/cantina.mp3 new file: volumes/web/static/media/countdowns.json new file: volumes/web/static/media/gong.mp4 new file: volumes/web/template/blog.html new file: volumes/web/template/index-lite.html new file: volumes/web/template/index.html new file: volumes/web/template/mandelbrot.html
218 lines
4.8 KiB
Go
218 lines
4.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
func getenv(k, fallback string) string {
|
|
if v := os.Getenv(k); v != "" {
|
|
return v
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func buildDSN() string {
|
|
if url := os.Getenv("DATABASE_URL"); url != "" {
|
|
return url
|
|
}
|
|
|
|
host := getenv("POSTGRES_HOST", "database")
|
|
port := getenv("POSTGRES_PORT", "5432")
|
|
user := getenv("POSTGRES_USER", "app")
|
|
pass := getenv("POSTGRES_PASSWORD", "appsecret")
|
|
name := getenv("POSTGRES_DB", "appdb")
|
|
|
|
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", user, pass, host, port, name)
|
|
}
|
|
|
|
type FeedMeta struct {
|
|
ID int
|
|
URL string
|
|
}
|
|
|
|
type Article struct {
|
|
ArticleID string
|
|
FeedID int
|
|
Title string
|
|
Link string
|
|
Summary string
|
|
PublishedAt *time.Time
|
|
}
|
|
|
|
type MarketQuote struct {
|
|
Instrument string
|
|
Bid float64
|
|
QuotedAt time.Time
|
|
}
|
|
|
|
func writeVerbindungToSQL(verbindungen [][]Verbindung) error {
|
|
ctx := context.Background()
|
|
dsn := buildDSN()
|
|
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("connect db: %w", err)
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
tx, err := conn.Begin(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
if _, err := tx.Exec(ctx, "DELETE FROM rmv_data"); err != nil {
|
|
return fmt.Errorf("clear rmv_data: %w", err)
|
|
}
|
|
|
|
stmt := `INSERT INTO rmv_data (trip_index, leg_index, linie, abfahrt, ankunft, von, nach) VALUES ($1,$2,$3,$4,$5,$6,$7)`
|
|
|
|
for tripIdx, trip := range verbindungen {
|
|
for legIdx, leg := range trip {
|
|
if _, err := tx.Exec(ctx, stmt, tripIdx, legIdx, leg.Linie, leg.Abfahrt, leg.Ankunft, leg.Von, leg.Nach); err != nil {
|
|
return fmt.Errorf("insert trip %d leg %d: %w", tripIdx, legIdx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("commit tx: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func fetchPendingFeeds(ctx context.Context) ([]FeedMeta, error) {
|
|
dsn := buildDSN()
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connect db: %w", err)
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
rows, err := conn.Query(ctx, `
|
|
SELECT id, url
|
|
FROM feeds
|
|
WHERE access = TRUE
|
|
AND (last_checked IS NULL OR last_checked < now() - interval '5 minutes')
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query feeds: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var feeds []FeedMeta
|
|
for rows.Next() {
|
|
var f FeedMeta
|
|
if err := rows.Scan(&f.ID, &f.URL); err != nil {
|
|
return nil, fmt.Errorf("scan feed: %w", err)
|
|
}
|
|
feeds = append(feeds, f)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate feeds: %w", err)
|
|
}
|
|
|
|
return feeds, nil
|
|
}
|
|
|
|
func articleIDFromLink(link string) string {
|
|
h := sha1.Sum([]byte(link))
|
|
return hex.EncodeToString(h[:])
|
|
}
|
|
|
|
func saveArticles(ctx context.Context, feedID int, articles []Article) error {
|
|
dsn := buildDSN()
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("connect db: %w", err)
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
tx, err := conn.Begin(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
stmt := `
|
|
INSERT INTO articles (article_id, feed_id, title, link, summary, published_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
ON CONFLICT (article_id) DO NOTHING
|
|
`
|
|
|
|
for _, a := range articles {
|
|
aid := a.ArticleID
|
|
if aid == "" {
|
|
aid = articleIDFromLink(a.Link)
|
|
}
|
|
|
|
if _, err := tx.Exec(ctx, stmt, aid, feedID, a.Title, a.Link, a.Summary, a.PublishedAt); err != nil {
|
|
return fmt.Errorf("insert article for feed %d: %w", feedID, err)
|
|
}
|
|
}
|
|
|
|
if _, err := tx.Exec(ctx, `UPDATE feeds SET last_checked = now() WHERE id = $1`, feedID); err != nil {
|
|
return fmt.Errorf("update feed timestamp: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("commit tx: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setFeedAccess(ctx context.Context, feedID int, access bool) error {
|
|
dsn := buildDSN()
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("connect db: %w", err)
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
if _, err := conn.Exec(ctx, `UPDATE feeds SET access = $1 WHERE id = $2`, access, feedID); err != nil {
|
|
return fmt.Errorf("update feed access: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func saveMarketQuotes(ctx context.Context, quotes []MarketQuote) error {
|
|
dsn := buildDSN()
|
|
conn, err := pgx.Connect(ctx, dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("connect db: %w", err)
|
|
}
|
|
defer conn.Close(ctx)
|
|
|
|
batch := &pgx.Batch{}
|
|
for _, q := range quotes {
|
|
batch.Queue(
|
|
`INSERT INTO market_quotes (instrument, bid, quoted_at)
|
|
VALUES ($1, $2, $3)
|
|
ON CONFLICT (instrument) DO UPDATE
|
|
SET bid = EXCLUDED.bid, quoted_at = EXCLUDED.quoted_at`,
|
|
q.Instrument, q.Bid, q.QuotedAt,
|
|
)
|
|
}
|
|
|
|
br := conn.SendBatch(ctx, batch)
|
|
defer br.Close()
|
|
|
|
for range quotes {
|
|
if _, err := br.Exec(); err != nil {
|
|
return fmt.Errorf("insert market quote: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|