From a33ec14c5ffddc3bc67c445ef341460bae6b3184 Mon Sep 17 00:00:00 2001 From: qdust41 Date: Mon, 6 Apr 2026 23:05:04 -0400 Subject: [PATCH] Integrating clickhouse for metrics. --- config/config.exs | 7 +- config/dev.exs | 9 + config/runtime.exs | 18 ++ config/test.exs | 9 + lib/mixer/application.ex | 4 + lib/mixer/clickhouse_repo.ex | 13 + lib/mixer/metrics.ex | 263 ++++++++++++++++++ lib/mixer/metrics/buffer.ex | 147 ++++++++++ lib/mixer/metrics/post_event.ex | 40 +++ mix.exs | 3 +- mix.lock | 2 + priv/clickhouse/migrations/.formatter.exs | 4 + .../20260407000001_create_post_events.exs | 49 ++++ priv/clickhouse/seeds.exs | 11 + 14 files changed, 577 insertions(+), 2 deletions(-) create mode 100644 lib/mixer/clickhouse_repo.ex create mode 100644 lib/mixer/metrics.ex create mode 100644 lib/mixer/metrics/buffer.ex create mode 100644 lib/mixer/metrics/post_event.ex create mode 100644 priv/clickhouse/migrations/.formatter.exs create mode 100644 priv/clickhouse/migrations/20260407000001_create_post_events.exs create mode 100644 priv/clickhouse/seeds.exs diff --git a/config/config.exs b/config/config.exs index cd278bf..499502c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -94,7 +94,7 @@ config :spark, ] config :mixer, - ecto_repos: [Mixer.Repo], + ecto_repos: [Mixer.Repo, Mixer.ClickhouseRepo], generators: [timestamp_type: :utc_datetime], ash_domains: [Mixer.Accounts, Mixer.Posts], ash_authentication: [return_error_on_invalid_magic_link_token?: true] @@ -158,6 +158,11 @@ config :logger, :default_formatter, # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason +# ClickHouse repo — migrations live in priv/clickhouse/migrations +config :mixer, Mixer.ClickhouseRepo, + priv: "priv/clickhouse", + migration_source: "ch_schema_migrations" + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/config/dev.exs b/config/dev.exs index 047324d..f65dd58 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -106,3 +106,12 @@ config :ex_aws, :s3, config :waffle, bucket: "mixer-bucket", asset_host: "http://localhost:9000" + +# ClickHouse (default local install) +config :mixer, Mixer.ClickhouseRepo, + scheme: "http", + hostname: "localhost", + port: 8123, + database: "mixer_metrics", + username: "default", + password: "" diff --git a/config/runtime.exs b/config/runtime.exs index 05cd20d..3c752d4 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -22,6 +22,11 @@ end config :mixer, MixerWeb.Endpoint, http: [port: String.to_integer(System.get_env("PORT", "4000"))] +# ClickHouse is available in all environments via env vars when set +if clickhouse_url = System.get_env("CLICKHOUSE_URL") do + config :mixer, Mixer.ClickhouseRepo, url: clickhouse_url +end + if config_env() == :prod do database_url = System.get_env("DATABASE_URL") || @@ -40,6 +45,19 @@ if config_env() == :prod do # pool_count: 4, socket_options: maybe_ipv6 + # ClickHouse — configure via CLICKHOUSE_URL or individual vars + unless System.get_env("CLICKHOUSE_URL") do + config :mixer, Mixer.ClickhouseRepo, + scheme: System.get_env("CLICKHOUSE_SCHEME", "http"), + hostname: + System.get_env("CLICKHOUSE_HOST") || + raise("Missing environment variable `CLICKHOUSE_HOST`!"), + port: String.to_integer(System.get_env("CLICKHOUSE_PORT", "8123")), + database: System.get_env("CLICKHOUSE_DATABASE", "mixer_metrics"), + username: System.get_env("CLICKHOUSE_USERNAME", "default"), + password: System.get_env("CLICKHOUSE_PASSWORD", "") + end + # The secret key base is used to sign/encrypt cookies and other secrets. # A default value is used in config/dev.exs and config/test.exs but you # want to use a different value for prod and you most likely don't want diff --git a/config/test.exs b/config/test.exs index 7519e3f..55b13b4 100644 --- a/config/test.exs +++ b/config/test.exs @@ -42,3 +42,12 @@ config :phoenix_live_view, # Sort query params output of verified routes for robust url comparisons config :phoenix, sort_verified_routes_query_params: true + +# ClickHouse — point at a dedicated test database +config :mixer, Mixer.ClickhouseRepo, + scheme: "http", + hostname: "localhost", + port: 8123, + database: "mixer_metrics_test", + username: "default", + password: "" diff --git a/lib/mixer/application.ex b/lib/mixer/application.ex index b716d3a..c59822e 100644 --- a/lib/mixer/application.ex +++ b/lib/mixer/application.ex @@ -10,6 +10,10 @@ defmodule Mixer.Application do children = [ MixerWeb.Telemetry, Mixer.Repo, + # ClickHouse repo for analytics — started before the metrics buffer + Mixer.ClickhouseRepo, + # In-memory event buffer that batches writes to ClickHouse + Mixer.Metrics.Buffer, {DNSCluster, query: Application.get_env(:mixer, :dns_cluster_query) || :ignore}, {Phoenix.PubSub, name: Mixer.PubSub}, # Start a worker by calling: Mixer.Worker.start_link(arg) diff --git a/lib/mixer/clickhouse_repo.ex b/lib/mixer/clickhouse_repo.ex new file mode 100644 index 0000000..bc0d838 --- /dev/null +++ b/lib/mixer/clickhouse_repo.ex @@ -0,0 +1,13 @@ +defmodule Mixer.ClickhouseRepo do + @moduledoc """ + Ecto repository for ClickHouse, backed by the `ecto_ch` / `Ch` adapter. + + Used exclusively for analytics writes (via `Mixer.Metrics.Buffer`) and + read queries (via `Mixer.Metrics`). It is **not** an Ash repo and must + never be used for transactional application data. + """ + + use Ecto.Repo, + otp_app: :mixer, + adapter: Ecto.Adapters.ClickHouse +end diff --git a/lib/mixer/metrics.ex b/lib/mixer/metrics.ex new file mode 100644 index 0000000..2fc7378 --- /dev/null +++ b/lib/mixer/metrics.ex @@ -0,0 +1,263 @@ +defmodule Mixer.Metrics do + @moduledoc """ + Public API for tracking and querying post (tweet) metrics via ClickHouse. + + ## Tracking events + + Tracking calls are non-blocking — events are handed off to the in-memory + `Mixer.Metrics.Buffer` GenServer and written to ClickHouse in batches. + + # Record a tweet view (anonymous) + Mixer.Metrics.track_view(tweet_id) + + # Record a view with a logged-in user and their IP + Mixer.Metrics.track_view(tweet_id, user_id: user.id, ip_address: conn.remote_ip) + + ## Querying metrics + + Query functions execute synchronous ClickHouse SQL and return plain maps. + + {:ok, summary} = Mixer.Metrics.get_summary(tweet_id) + # => %{views: 42, likes: 7, unlikes: 1, comments: 3, shares: 0} + + {:ok, rows} = Mixer.Metrics.get_top_posts(10) + # => [%{tweet_id: "...", views: 99}, ...] + """ + + require Logger + + alias Mixer.ClickhouseRepo + alias Mixer.Metrics.Buffer + + # --------------------------------------------------------------------------- + # Event types + # --------------------------------------------------------------------------- + + @type event_type :: :view | :like | :unlike | :comment | :share + + @type track_opt :: + {:user_id, binary() | nil} + | {:ip_address, binary() | :inet.ip_address() | nil} + + # --------------------------------------------------------------------------- + # Tracking helpers + # --------------------------------------------------------------------------- + + @doc """ + Track a tweet view event. + + ## Options + + * `:user_id` — UUID of the viewing user (nil for anonymous) + * `:ip_address` — originating IP; accepts a string or an `:inet` tuple + """ + @spec track_view(binary(), [track_opt()]) :: :ok + def track_view(tweet_id, opts \\ []), do: enqueue("view", tweet_id, opts) + + @doc "Track a tweet like event." + @spec track_like(binary(), [track_opt()]) :: :ok + def track_like(tweet_id, opts \\ []), do: enqueue("like", tweet_id, opts) + + @doc "Track a tweet unlike event." + @spec track_unlike(binary(), [track_opt()]) :: :ok + def track_unlike(tweet_id, opts \\ []), do: enqueue("unlike", tweet_id, opts) + + @doc "Track a comment (reply) event on a tweet." + @spec track_comment(binary(), [track_opt()]) :: :ok + def track_comment(tweet_id, opts \\ []), do: enqueue("comment", tweet_id, opts) + + @doc "Track a tweet share / repost event." + @spec track_share(binary(), [track_opt()]) :: :ok + def track_share(tweet_id, opts \\ []), do: enqueue("share", tweet_id, opts) + + # --------------------------------------------------------------------------- + # Query helpers + # --------------------------------------------------------------------------- + + @doc """ + Return a summary of all event counts for a single tweet. + + Returns `{:ok, map}` on success or `{:error, reason}` on failure. + + ## Example + + {:ok, %{views: 12, likes: 3, unlikes: 0, comments: 5, shares: 1}} = + Mixer.Metrics.get_summary(tweet_id) + """ + @spec get_summary(binary()) :: {:ok, map()} | {:error, term()} + def get_summary(tweet_id) do + sql = """ + SELECT + countIf(event_type = 'view') AS views, + countIf(event_type = 'like') AS likes, + countIf(event_type = 'unlike') AS unlikes, + countIf(event_type = 'comment') AS comments, + countIf(event_type = 'share') AS shares + FROM post_events + WHERE tweet_id = {tweet_id:String} + """ + + case ClickhouseRepo.query(sql, %{"tweet_id" => tweet_id}) do + {:ok, result} -> + {:ok, row_to_summary(result)} + + {:error, reason} -> + Logger.error("[Mixer.Metrics] get_summary failed for #{tweet_id}: #{inspect(reason)}") + {:error, reason} + end + end + + @doc """ + Return view counts bucketed by UTC hour for the past `hours` hours. + + Useful for rendering a sparkline on a tweet detail page. + + ## Example + + {:ok, rows} = Mixer.Metrics.get_hourly_views(tweet_id, 24) + # => [%{hour: ~N[2026-04-07 00:00:00], views: 5}, ...] + """ + @spec get_hourly_views(binary(), pos_integer()) :: {:ok, [map()]} | {:error, term()} + def get_hourly_views(tweet_id, hours \\ 24) when is_integer(hours) and hours > 0 do + sql = """ + SELECT + toStartOfHour(occurred_at) AS hour, + count() AS views + FROM post_events + WHERE + tweet_id = {tweet_id:String} + AND event_type = 'view' + AND occurred_at >= now() - toIntervalHour({hours:UInt32}) + GROUP BY hour + ORDER BY hour ASC + """ + + case ClickhouseRepo.query(sql, %{"tweet_id" => tweet_id, "hours" => hours}) do + {:ok, %{rows: rows}} -> + {:ok, Enum.map(rows, fn [hour, views] -> %{hour: hour, views: views} end)} + + {:error, reason} -> + Logger.error("[Mixer.Metrics] get_hourly_views failed: #{inspect(reason)}") + {:error, reason} + end + end + + @doc """ + Return the top `limit` tweets ordered by total view count across all time. + + ## Example + + {:ok, rows} = Mixer.Metrics.get_top_posts(10) + # => [%{tweet_id: "...", views: 99}, %{tweet_id: "...", views: 72}, ...] + """ + @spec get_top_posts(pos_integer()) :: {:ok, [map()]} | {:error, term()} + def get_top_posts(limit \\ 10) when is_integer(limit) and limit > 0 do + sql = """ + SELECT + tweet_id, + countIf(event_type = 'view') AS views + FROM post_events + GROUP BY tweet_id + ORDER BY views DESC + LIMIT {limit:UInt32} + """ + + case ClickhouseRepo.query(sql, %{"limit" => limit}) do + {:ok, %{rows: rows}} -> + {:ok, Enum.map(rows, fn [tweet_id, views] -> %{tweet_id: tweet_id, views: views} end)} + + {:error, reason} -> + Logger.error("[Mixer.Metrics] get_top_posts failed: #{inspect(reason)}") + {:error, reason} + end + end + + @doc """ + Return per-event-type counts for a list of tweet IDs in a single query. + + Handy for batch-enriching a feed with metrics without N+1 queries. + + ## Example + + {:ok, map} = Mixer.Metrics.get_bulk_summaries(tweet_ids) + # => %{"" => %{views: 5, likes: 2, ...}, ...} + """ + @spec get_bulk_summaries([binary()]) :: {:ok, %{binary() => map()}} | {:error, term()} + def get_bulk_summaries([]), do: {:ok, %{}} + + def get_bulk_summaries(tweet_ids) when is_list(tweet_ids) do + # ecto_ch supports passing arrays as query parameters + sql = """ + SELECT + tweet_id, + countIf(event_type = 'view') AS views, + countIf(event_type = 'like') AS likes, + countIf(event_type = 'unlike') AS unlikes, + countIf(event_type = 'comment') AS comments, + countIf(event_type = 'share') AS shares + FROM post_events + WHERE tweet_id IN {tweet_ids:Array(String)} + GROUP BY tweet_id + """ + + case ClickhouseRepo.query(sql, %{"tweet_ids" => tweet_ids}) do + {:ok, %{rows: rows}} -> + summaries = + Map.new(rows, fn [tweet_id, views, likes, unlikes, comments, shares] -> + {tweet_id, + %{ + views: views, + likes: likes, + unlikes: unlikes, + comments: comments, + shares: shares + }} + end) + + {:ok, summaries} + + {:error, reason} -> + Logger.error("[Mixer.Metrics] get_bulk_summaries failed: #{inspect(reason)}") + {:error, reason} + end + end + + # --------------------------------------------------------------------------- + # Private helpers + # --------------------------------------------------------------------------- + + defp enqueue(event_type, tweet_id, opts) do + event = %{ + event_type: event_type, + tweet_id: tweet_id, + user_id: Keyword.get(opts, :user_id), + occurred_at: DateTime.utc_now() |> DateTime.truncate(:second), + ip_address: opts |> Keyword.get(:ip_address) |> format_ip() + } + + Buffer.track(event) + end + + defp format_ip(nil), do: nil + defp format_ip(ip) when is_binary(ip), do: ip + + defp format_ip({a, b, c, d}), do: "#{a}.#{b}.#{c}.#{d}" + + defp format_ip({a, b, c, d, e, f, g, h}) do + [a, b, c, d, e, f, g, h] + |> Enum.map_join(":", &Integer.to_string(&1, 16)) + end + + defp row_to_summary(%{rows: [[views, likes, unlikes, comments, shares] | _]}) do + %{ + views: views, + likes: likes, + unlikes: unlikes, + comments: comments, + shares: shares + } + end + + # ClickHouse returns no rows when the tweet has zero events — default to 0 + defp row_to_summary(_), do: %{views: 0, likes: 0, unlikes: 0, comments: 0, shares: 0} +end diff --git a/lib/mixer/metrics/buffer.ex b/lib/mixer/metrics/buffer.ex new file mode 100644 index 0000000..23366e0 --- /dev/null +++ b/lib/mixer/metrics/buffer.ex @@ -0,0 +1,147 @@ +defmodule Mixer.Metrics.Buffer do + @moduledoc """ + GenServer that accumulates post metric events in memory and flushes them + to ClickHouse in batches. + + Two conditions trigger a flush: + + 1. **Timer** — every `@flush_interval` milliseconds (default 10 s). + 2. **Threshold** — whenever the in-memory buffer reaches `@max_buffer_size` + rows (default 500). + + If ClickHouse is unavailable the error is logged and the buffered events + are discarded rather than retried indefinitely, preventing unbounded memory + growth. For production deployments that require durability, consider adding + a persistent queue in front of this buffer. + """ + + use GenServer + + require Logger + + alias Mixer.Metrics.PostEvent + + @flush_interval :timer.seconds(10) + @max_buffer_size 500 + + # --------------------------------------------------------------------------- + # Public API + # --------------------------------------------------------------------------- + + @doc """ + Start the buffer process and link it to the calling process. + + Accepts an optional keyword list of overrides: + + * `:flush_interval` — milliseconds between scheduled flushes + * `:max_buffer_size` — row count that triggers an immediate flush + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc """ + Enqueue a single analytics event map for buffered insertion into ClickHouse. + + The map must contain at minimum the fields required by `Mixer.Metrics.PostEvent`: + `:event_type`, `:tweet_id`, `:occurred_at`. Other fields are optional. + + This call is asynchronous (cast) and returns `:ok` immediately. + """ + @spec track(map()) :: :ok + def track(event) when is_map(event) do + GenServer.cast(__MODULE__, {:track, event}) + end + + @doc """ + Force an immediate flush of all buffered events to ClickHouse, regardless + of the timer or threshold. Returns `:ok` after the flush completes. + + Primarily useful in tests. + """ + @spec flush() :: :ok + def flush do + GenServer.call(__MODULE__, :flush) + end + + # --------------------------------------------------------------------------- + # GenServer callbacks + # --------------------------------------------------------------------------- + + @impl GenServer + def init(opts) do + flush_interval = Keyword.get(opts, :flush_interval, @flush_interval) + max_buffer_size = Keyword.get(opts, :max_buffer_size, @max_buffer_size) + + schedule_flush(flush_interval) + + state = %{ + events: [], + count: 0, + flush_interval: flush_interval, + max_buffer_size: max_buffer_size + } + + {:ok, state} + end + + @impl GenServer + def handle_cast({:track, event}, state) do + new_count = state.count + 1 + new_events = [event | state.events] + + if new_count >= state.max_buffer_size do + do_flush(new_events) + {:noreply, %{state | events: [], count: 0}} + else + {:noreply, %{state | events: new_events, count: new_count}} + end + end + + @impl GenServer + def handle_call(:flush, _from, state) do + do_flush(state.events) + {:reply, :ok, %{state | events: [], count: 0}} + end + + @impl GenServer + def handle_info(:flush, state) do + do_flush(state.events) + schedule_flush(state.flush_interval) + {:noreply, %{state | events: [], count: 0}} + end + + @impl GenServer + def terminate(_reason, state) do + # Best-effort flush on shutdown so we don't lose buffered events during + # graceful stops (e.g., deploys). + do_flush(state.events) + :ok + end + + # --------------------------------------------------------------------------- + # Private helpers + # --------------------------------------------------------------------------- + + defp do_flush([]), do: :ok + + defp do_flush(events) do + rows = Enum.reverse(events) + + try do + {count, _} = Mixer.ClickhouseRepo.insert_all(PostEvent, rows) + Logger.debug("[Mixer.Metrics.Buffer] Flushed #{count} event(s) to ClickHouse") + rescue + error -> + Logger.error( + "[Mixer.Metrics.Buffer] Failed to flush #{length(rows)} event(s) to ClickHouse: " <> + Exception.message(error) + ) + end + end + + defp schedule_flush(interval) do + Process.send_after(self(), :flush, interval) + end +end diff --git a/lib/mixer/metrics/post_event.ex b/lib/mixer/metrics/post_event.ex new file mode 100644 index 0000000..191d6d0 --- /dev/null +++ b/lib/mixer/metrics/post_event.ex @@ -0,0 +1,40 @@ +defmodule Mixer.Metrics.PostEvent do + @moduledoc """ + Ecto schema that maps to the `post_events` table in ClickHouse. + + Each row represents a single analytics event tied to a tweet (post). + The table uses a MergeTree engine ordered by `(occurred_at, event_type, + tweet_id)` for efficient time-range scans and per-tweet aggregations. + + ## Event types + + | event_type | Description | + |-------------|------------------------------------------| + | `"view"` | A tweet was displayed to a user | + | `"like"` | A user liked a tweet | + | `"unlike"` | A user removed their like from a tweet | + | `"comment"` | A user replied to a tweet | + | `"share"` | A user shared / reposted a tweet | + """ + + use Ecto.Schema + + @primary_key false + + schema "post_events" do + # LowCardinality(String) in ClickHouse — keep values in the set above + field :event_type, :string + + # The tweet that the event relates to + field :tweet_id, Ecto.UUID + + # The acting user; may be nil for anonymous views + field :user_id, Ecto.UUID + + # Wall-clock time of the event (UTC, second precision) + field :occurred_at, :utc_datetime + + # Optional originating IP, useful for deduplicating anonymous views + field :ip_address, :string + end +end diff --git a/mix.exs b/mix.exs index 8ec099f..acd8766 100644 --- a/mix.exs +++ b/mix.exs @@ -91,7 +91,8 @@ defmodule Mixer.MixProject do {:ex_aws, "~> 2.1.2"}, {:ex_aws_s3, "~> 2.0"}, {:hackney, "~> 1.9"}, - {:sweet_xml, "~> 0.6"} + {:sweet_xml, "~> 0.6"}, + {:ecto_ch, "~> 0.3"} ] end diff --git a/mix.lock b/mix.lock index 79d5771..8ca6851 100644 --- a/mix.lock +++ b/mix.lock @@ -20,6 +20,7 @@ "castore": {:hex, :castore, "1.0.18", "5e43ef0ec7d31195dfa5a65a86e6131db999d074179d2ba5a8de11fe14570f55", [:mix], [], "hexpm", "f393e4fe6317829b158fb74d86eb681f737d2fe326aa61ccf6293c4104957e34"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"}, "certifi": {:hex, :certifi, "2.15.0", "0e6e882fcdaaa0a5a9f2b3db55b1394dba07e8d6d9bcad08318fb604c6839712", [:rebar3], [], "hexpm", "b147ed22ce71d72eafdad94f055165c1c182f61a2ff49df28bcc71d1d5b94a60"}, + "ch": {:hex, :ch, "0.7.1", "116c08094b30d095c3bd6a8fe4ebe19fdaaf3dce84e2413cfdd6af157baf6303", [:mix], [{:db_connection, "~> 2.9.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "3c1c900291ff9c4c077cd1dc0c265051a3f1d26320d58b37ed9e91b33d41a868"}, "cinder": {:hex, :cinder, "0.12.1", "02ae4988e025fb32c37e4e7f2e491586b952918c0dd99d856da13271cd680e16", [:mix], [{:ash, "~> 3.0", [hex: :ash, repo: "hexpm", optional: false]}, {:ash_phoenix, "~> 2.3", [hex: :ash_phoenix, repo: "hexpm", optional: false]}, {:gettext, "~> 1.0.0", [hex: :gettext, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}], "hexpm", "a48b5677c1f57619d9d7564fb2bd7928f93750a2e8c0b1b145852a30ecf2aa20"}, "comeonin": {:hex, :comeonin, "5.5.1", "5113e5f3800799787de08a6e0db307133850e635d34e9fab23c70b6501669510", [:mix], [], "hexpm", "65aac8f19938145377cee73973f192c5645873dcf550a8a6b18187d17c13ccdb"}, "conv_case": {:hex, :conv_case, "0.2.3", "c1455c27d3c1ffcdd5f17f1e91f40b8a0bc0a337805a6e8302f441af17118ed8", [:mix], [], "hexpm", "88f29a3d97d1742f9865f7e394ed3da011abb7c5e8cc104e676fdef6270d4b4a"}, @@ -29,6 +30,7 @@ "dns_cluster": {:hex, :dns_cluster, "0.2.0", "aa8eb46e3bd0326bd67b84790c561733b25c5ba2fe3c7e36f28e88f384ebcb33", [:mix], [], "hexpm", "ba6f1893411c69c01b9e8e8f772062535a4cf70f3f35bcc964a324078d8c8240"}, "dotenvy": {:hex, :dotenvy, "1.1.1", "00e318f3c51de9fafc4b48598447e386f19204dc18ca69886905bb8f8b08b667", [:mix], [], "hexpm", "c8269471b5701e9e56dc86509c1199ded2b33dce088c3471afcfef7839766d8e"}, "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, + "ecto_ch": {:hex, :ecto_ch, "0.8.6", "f31b507e86690c003f46e75d6e742e6b5d8ce34b6b10a86604b1c3aa785e0b56", [:mix], [{:ch, "~> 0.5.0 or ~> 0.6.0 or ~> 0.7.0", [hex: :ch, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.13.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "6ca9f1cf9680452b1925c6a3a7b5e3d8b12e38ee134b03c6a45a8b26434fad97"}, "ecto_sql": {:hex, :ecto_sql, "3.13.5", "2f8282b2ad97bf0f0d3217ea0a6fff320ead9e2f8770f810141189d182dc304e", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aa36751f4e6a2b56ae79efb0e088042e010ff4935fc8684e74c23b1f49e25fdc"}, "elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"}, "esbuild": {:hex, :esbuild, "0.10.0", "b0aa3388a1c23e727c5a3e7427c932d89ee791746b0081bbe56103e9ef3d291f", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "468489cda427b974a7cc9f03ace55368a83e1a7be12fba7e30969af78e5f8c70"}, diff --git a/priv/clickhouse/migrations/.formatter.exs b/priv/clickhouse/migrations/.formatter.exs new file mode 100644 index 0000000..dd50c95 --- /dev/null +++ b/priv/clickhouse/migrations/.formatter.exs @@ -0,0 +1,4 @@ +[ + import_deps: [:ecto_ch], + inputs: ["*.exs"] +] diff --git a/priv/clickhouse/migrations/20260407000001_create_post_events.exs b/priv/clickhouse/migrations/20260407000001_create_post_events.exs new file mode 100644 index 0000000..43900c8 --- /dev/null +++ b/priv/clickhouse/migrations/20260407000001_create_post_events.exs @@ -0,0 +1,49 @@ +defmodule Mixer.ClickhouseRepo.Migrations.CreatePostEvents do + use Ecto.Migration + + @doc """ + Creates the `post_events` table using a MergeTree engine. + + Key design decisions: + + * `LowCardinality(String)` for `event_type` — the cardinality is tiny + (5–10 values), so ClickHouse can store it as a dictionary, giving both + compression and faster filtering. + + * `Nullable(UUID)` / `Nullable(String)` for optional columns — ClickHouse + handles NULLs differently from PostgreSQL; we make the nullable fields + explicit so the schema is unambiguous. + + * `ORDER BY (occurred_at, event_type, tweet_id)` — optimises the two most + common query patterns: + 1. Time-range scans (`WHERE occurred_at >= now() - interval 24 HOUR`) + 2. Per-tweet aggregations (`WHERE tweet_id = ?`) + + * `PARTITION BY toYYYYMM(occurred_at)` — monthly partitions make it cheap + to drop old data with `ALTER TABLE … DROP PARTITION`. + + * `TTL occurred_at + INTERVAL 1 YEAR DELETE` — automatically reclaim disk + space after two years. Adjust as required. + """ + def up do + execute(""" + CREATE TABLE IF NOT EXISTS post_events + ( + event_type LowCardinality(String), + tweet_id UUID, + user_id Nullable(UUID), + occurred_at DateTime, + ip_address Nullable(String) + ) + ENGINE = MergeTree() + PARTITION BY toYYYYMM(occurred_at) + ORDER BY (occurred_at, event_type, tweet_id) + TTL occurred_at + INTERVAL 1 YEAR DELETE + SETTINGS index_granularity = 8192 + """) + end + + def down do + execute("DROP TABLE IF EXISTS post_events") + end +end diff --git a/priv/clickhouse/seeds.exs b/priv/clickhouse/seeds.exs new file mode 100644 index 0000000..dd79cfb --- /dev/null +++ b/priv/clickhouse/seeds.exs @@ -0,0 +1,11 @@ +# Script for populating the database. You can run it as: +# +# mix run priv/clickhouse/seeds.exs +# +# Inside the script, you can read and write to any of your +# repositories directly: +# +# Mixer.ClickhouseRepo.insert!(%Mixer.SomeSchema{}) +# +# We recommend using the bang functions (`insert!`, `update!` +# and so on) as they will fail if something goes wrong.