Anyone can write a 20-line script that hits an API in a loop. The script works on Tuesday afternoon. It dies on Friday at 3 a.m. when the network blips, the upstream returns a 429, and a single retry storm exhausts your credit balance before your alerting catches up.
Production pipelines are different. They assume failure is the default state and success is what you engineer toward. When you depend on a third-party API like TikLiveAPI, the difference between a "demo" and a "production system" comes down to eight unglamorous patterns: backoff, token buckets, concurrency control, idempotency, circuit breakers, persistent queues, observability, and budget awareness.
This guide walks through each pattern with working Python code (and a few Node.js parallels), then assembles them into a real ingestion pipeline that fetches 100,000 users per day without melting down.
TikLiveAPI uses a credit-based pricing model with one simple rule: 1 request = 1 credit. Credits do not expire, there is no subscription, and the standard rate limit is 200 requests per minute (raisable on request via support).
There is no documented per-second cap, but in practice burst traffic above roughly 50 requests per second can briefly receive HTTP 429 responses while the rate limiter catches up. Treat 429 as a normal, expected condition, not an exception.
Authentication is a single header: X-Api-Key. The base URL is https://api.tikliveapi.com. Get your key from your profile and watch your credit balance from the same page.
The single most important pattern. When a request fails with 429 or 5xx, wait, then retry with progressively longer delays plus random jitter so concurrent workers do not all retry at the same instant.
Using the tenacity library in Python:
import os
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
API_KEY = os.environ["TIKLIVE_API_KEY"]
BASE = "https://api.tikliveapi.com"
HEADERS = {"X-Api-Key": API_KEY}
class RetryableError(Exception):
pass
@retry(
stop=stop_after_attempt(6),
wait=wait_exponential_jitter(initial=1, max=30, jitter=2),
retry=retry_if_exception_type(RetryableError),
reraise=True,
)
def get_user_info(username: str) -> dict:
r = httpx.get(f"{BASE}/userinfo-by-username/", params={"username": username}, headers=HEADERS, timeout=10.0)
if r.status_code == 429 or r.status_code >= 500:
raise RetryableError(f"status={r.status_code}")
r.raise_for_status()
return r.json() # {"user": {...}, "stats": {...}}
If you do not want a dependency, plain asyncio works too:
import asyncio, random, httpx
async def fetch_with_backoff(client, url, params, max_attempts=6):
delay = 1.0
for attempt in range(max_attempts):
r = await client.get(url, params=params, headers=HEADERS, timeout=10.0)
if r.status_code < 400:
return r.json()
if r.status_code != 429 and r.status_code < 500:
r.raise_for_status()
sleep_for = delay + random.uniform(0, delay)
await asyncio.sleep(min(sleep_for, 30))
delay *= 2
raise RuntimeError("exhausted retries")
Two things to notice. First, we only retry on 429 and 5xx; 4xx (other than 429) is a client bug and retrying will not help. Second, jitter is not optional. Without it, ten workers that all hit 429 at the same millisecond will all retry at the same millisecond.
Backoff is reactive. A token bucket is proactive: you refuse to send a request faster than your declared rate, so the server never has to push back.
import time, threading
class TokenBucket:
def __init__(self, rate_per_sec: float, capacity: int):
self.rate = rate_per_sec
self.capacity = capacity
self.tokens = capacity
self.last = time.monotonic()
self.lock = threading.Lock()
def take(self, n: int = 1) -> None:
while True:
with self.lock:
now = time.monotonic()
self.tokens = min(self.capacity, self.tokens + (now - self.last) * self.rate)
self.last = now
if self.tokens >= n:
self.tokens -= n
return
deficit = n - self.tokens
wait = deficit / self.rate
time.sleep(wait)
# 200 req/min = ~3.33 req/sec, allow short bursts of 10
bucket = TokenBucket(rate_per_sec=3.3, capacity=10)
def safe_get_user_id(username: str) -> str:
bucket.take()
r = httpx.get(f"{BASE}/userid/", params={"username": username}, headers=HEADERS)
r.raise_for_status()
return r.json()["id"] # {"id": "..."} - flat, no wrapper
Tune rate_per_sec to roughly 80 percent of your plan limit so you have headroom for retries and parallel workers. Capacity controls how big a burst is allowed before throttling kicks in.
A token bucket caps your rate over time. A semaphore caps the number of in-flight requests at any single moment, which protects your machine's file descriptors and the upstream's per-IP limits.
import asyncio, httpx
SEM = asyncio.Semaphore(20) # at most 20 in-flight calls
async def fetch_followers(client, userid: str, time_cursor: int = 0):
async with SEM:
r = await client.get(
f"{BASE}/user-followers/",
params={"userid": userid, "count": 200, "time": time_cursor},
headers=HEADERS,
timeout=15.0,
)
r.raise_for_status()
data = r.json()
return data["followers"] # flat snake_case objects
async def run():
async with httpx.AsyncClient(http2=True) as client:
userids = ["12345", "67890", "..."]
results = await asyncio.gather(*[fetch_followers(client, u) for u in userids])
return results
The Node.js equivalent uses p-limit:
import pLimit from "p-limit";
import axios from "axios";
const limit = pLimit(20);
const headers = { "X-Api-Key": process.env.TIKLIVE_API_KEY };
async function fetchFollowing(userid) {
const { data } = await axios.get("https://api.tikliveapi.com/user-following/", {
params: { userid, count: 200 },
headers,
});
// Note: top key is "followings" (plural with trailing s), not "following"
return data.followings;
}
const results = await Promise.all(userids.map((u) => limit(() => fetchFollowing(u))));
One gotcha worth flagging: the followers endpoint returns a top-level followers array, while the following endpoint returns followings (plural with a trailing "s"). Different keys, different pagination params (both use time as a timestamp, not a cursor). The full shape is documented at user followers and user following.
Retries are safe only if the operation is idempotent. For read-only endpoints that is automatic, but you still want to cache responses so retries do not double-bill you in credits.
import hashlib, json, pathlib
CACHE = pathlib.Path("./cache")
CACHE.mkdir(exist_ok=True)
def cache_key(path: str, params: dict) -> str:
raw = path + "?" + "&".join(f"{k}={v}" for k, v in sorted(params.items()))
return hashlib.sha256(raw.encode()).hexdigest()
def cached_get(path: str, params: dict, ttl_seconds: int = 3600) -> dict:
key = cache_key(path, params)
fp = CACHE / f"{key}.json"
if fp.exists() and (time.time() - fp.stat().st_mtime) < ttl_seconds:
return json.loads(fp.read_text())
r = httpx.get(f"{BASE}{path}", params=params, headers=HEADERS, timeout=10.0)
r.raise_for_status()
data = r.json()
tmp = fp.with_suffix(".tmp")
tmp.write_text(json.dumps(data))
tmp.replace(fp) # atomic
return data
posts = cached_get("/user-posts/", {"userid": "12345", "count": 35})
videos = posts["videos"] # snake_case fields: aweme_id, play, wmplay, play_count...
Two important details. First, the atomic rename (write to .tmp, then replace) prevents corrupted cache entries if the process is killed mid-write. Second, the /user-posts/ response uses snake_case throughout (aweme_id, play_count, play for the no-watermark URL, wmplay for the watermarked URL) - assume nothing about casing because the API mixes camelCase and snake_case inside the same response on user-info endpoints.
If the upstream is genuinely down, retrying makes things worse. A circuit breaker watches the recent error rate and "opens" the circuit after N consecutive failures, refusing requests for T seconds. After T it goes "half-open", lets one request through, and closes again on success.
import time, threading
class CircuitBreaker:
CLOSED, OPEN, HALF = "closed", "open", "half"
def __init__(self, fail_threshold=5, recovery_seconds=30):
self.state = self.CLOSED
self.failures = 0
self.opened_at = 0.0
self.fail_threshold = fail_threshold
self.recovery_seconds = recovery_seconds
self.lock = threading.Lock()
def call(self, fn, *args, **kwargs):
with self.lock:
if self.state == self.OPEN:
if time.time() - self.opened_at > self.recovery_seconds:
self.state = self.HALF
else:
raise RuntimeError("circuit open")
try:
result = fn(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
if self.failures >= self.fail_threshold:
self.state = self.OPEN
self.opened_at = time.time()
raise
with self.lock:
self.failures = 0
self.state = self.CLOSED
return result
breaker = CircuitBreaker()
data = breaker.call(get_user_info, "charlidamelio")
The breaker prevents a death spiral. When the upstream is genuinely sick, you stop hammering it and let it recover.
If the API is genuinely down for an hour, in-memory retries are useless because your worker process might die. Persist pending jobs to disk or Redis and drain them when the upstream recovers.
import redis, json
r = redis.Redis()
def enqueue_user_fetch(username: str):
r.lpush("queue:user_info", json.dumps({"username": username, "tries": 0}))
def drain_queue():
while True:
raw = r.brpop("queue:user_info", timeout=5)
if not raw:
continue
job = json.loads(raw[1])
try:
data = breaker.call(get_user_info, job["username"])
r.set(f"user:{job['username']}", json.dumps(data))
except Exception:
job["tries"] += 1
if job["tries"] < 10:
r.lpush("queue:user_info_retry", json.dumps(job))
else:
r.lpush("queue:user_info_dead", json.dumps(job))
The dead-letter queue is critical. After 10 failed attempts a job is moved to queue:user_info_dead for human inspection rather than retried forever.
You cannot fix what you cannot see. At a minimum, track per-endpoint success rate, p50/p95 latency, and retry count. Prometheus-style counters work well:
from prometheus_client import Counter, Histogram
REQ_TOTAL = Counter("tiklive_requests_total", "Total API requests", ["endpoint", "status"])
REQ_LATENCY = Histogram("tiklive_request_seconds", "Latency", ["endpoint"])
RETRY_COUNT = Counter("tiklive_retries_total", "Retries", ["endpoint"])
def instrumented_get(path: str, params: dict) -> dict:
with REQ_LATENCY.labels(endpoint=path).time():
r = httpx.get(f"{BASE}{path}", params=params, headers=HEADERS, timeout=10.0)
REQ_TOTAL.labels(endpoint=path, status=r.status_code).inc()
r.raise_for_status()
return r.json()
Graph these in Grafana. The two charts that matter most: p95 latency per endpoint (alerts you when the upstream slows down) and 429 rate per endpoint (tells you to ease off the concurrency or token bucket).
Since 1 request = 1 credit, runaway code is also runaway spend. Track credits remaining and pause non-critical jobs when you cross a threshold.
DAILY_BUDGET = 5000
DAILY_CRITICAL_RESERVE = 1000
def can_run_noncritical(used_today: int, balance: int) -> bool:
if used_today >= DAILY_BUDGET:
return False
if balance <= DAILY_CRITICAL_RESERVE:
return False
return True
Tag every job as critical or backfill. When budget is tight, only criticals run. Top up credits from pricing before the reserve runs out.
Putting it together. Goal: refresh metadata for 100,000 TikTok users every 24 hours, store results in Postgres, never exceed the 200 req/min ceiling.
Sustained rate needed: 100,000 / 86,400 = roughly 1.16 req/sec. That is comfortably under the 3.33 req/sec ceiling, so we run at 2 req/sec and reserve burst capacity for retries.
import asyncio, httpx, asyncpg
from prometheus_client import start_http_server
start_http_server(9000)
SEM = asyncio.Semaphore(15)
bucket = TokenBucket(rate_per_sec=2.0, capacity=10)
breaker = CircuitBreaker(fail_threshold=8, recovery_seconds=60)
async def fetch_one(client, username: str) -> dict | None:
bucket.take()
async with SEM:
try:
r = await client.get(
f"{BASE}/userinfo-by-username/",
params={"username": username},
headers=HEADERS,
timeout=15.0,
)
REQ_TOTAL.labels(endpoint="/userinfo-by-username/", status=r.status_code).inc()
if r.status_code == 429 or r.status_code >= 500:
raise RetryableError(str(r.status_code))
r.raise_for_status()
return r.json()
except RetryableError:
await asyncio.sleep(2 + random.random() * 3)
return None
async def main(usernames: list[str]):
pool = await asyncpg.create_pool(dsn="postgres://...")
async with httpx.AsyncClient(http2=True) as client:
for batch in chunked(usernames, 500):
results = await asyncio.gather(*[fetch_one(client, u) for u in batch])
rows = []
for username, data in zip(batch, results):
if not data:
enqueue_user_fetch(username) # retry later via persistent queue
continue
u = data["user"]
s = data["stats"]
rows.append((
username,
u.get("uniqueId"), # camelCase inside user object
s.get("followerCount"), # camelCase inside stats
s.get("videoCount"),
u.get("ins_id"), # snake_case mixed in same object
))
await pool.executemany(
"INSERT INTO tiktok_users (username, unique_id, follower_count, video_count, ins_id) "
"VALUES ($1,$2,$3,$4,$5) ON CONFLICT (username) DO UPDATE SET "
"follower_count=EXCLUDED.follower_count, video_count=EXCLUDED.video_count",
rows,
)
Note the mixed-casing trap inside the user object: uniqueId and avatarThumb are camelCase, but ins_id, twitter_id, and youtube_channel_id are snake_case in the same object. Code defensively with .get().
Before deploying, dry-run a single batch in the playground so you can see the live response shape with your own API key.
Retry 429 (rate limit) and any 5xx (server error). Do not retry 400, 401, 403, or 404 - those are client problems that retrying will not fix. A 401 specifically means your X-Api-Key header is wrong or missing.
The standard plan limit is 200 requests per minute. There is no documented per-second cap, but sustained bursts above roughly 50 req/sec may briefly receive 429 responses. Higher limits are available on request via contact support.
Successful API responses cost 1 credit each. Reach out via contact if you see unexpected credit consumption during a 5xx incident - the team responds within one business day.
Most list endpoints use a cursor param returned in the previous response. The two exceptions are user followers and user following, which use a time timestamp instead. Always pass the cursor you got back, do not invent your own.
Yes, for any data that does not need to be real-time fresh. User profiles change slowly, post lists for older accounts barely change at all. A 1-hour cache on /userinfo-by-username/ can easily cut your credit burn by 50 percent on a pipeline that re-fetches the same accounts. Just remember to use atomic file writes so a crash mid-write does not poison the cache.
Production-grade API consumption is not about one magic library. It is the layered combination of these eight patterns: backoff and jitter handle transient errors, token buckets and semaphores keep you under the ceiling, caching and idempotency prevent duplicate work, circuit breakers and persistent queues survive real outages, and observability plus budgeting keep you honest about cost. Build them once, copy them into every future pipeline, and stop firefighting at 3 a.m.
Ready to put what you read into code? Try our endpoints live or grab the full reference.