added_user_serach

This commit is contained in:
lusixing
2026-01-31 17:52:00 -08:00
parent 5f047fd95f
commit 106c3421e6
10 changed files with 75 additions and 2 deletions

5
.gitignore vendored
View File

@@ -2,4 +2,7 @@ gitignore
# Python virtual environments # Python virtual environments
venv/ venv/
app/ # Python cache
__pycache__/
app/__pycache__/
*.pyc

Binary file not shown.

Binary file not shown.

View File

@@ -235,7 +235,27 @@ async def get_designated_assets(
) )
return result.scalars().all() return result.scalars().all()
@app.get("/users/search", response_model=List[schemas.UserOut])
async def search_users(
query: str,
current_user: models.User = Depends(auth.get_current_user),
db: AsyncSession = Depends(database.get_db)
):
"""
Search for users by username or email.
"""
if not query:
raise HTTPException(status_code=400, detail="Search query is required")
# Search for username or email containing the query (case-insensitive)
result = await db.execute(
select(models.User).where(
(models.User.username.ilike(f"%{query}%")) |
(models.User.email.ilike(f"%{query}%"))
).limit(20) # Limit results for performance
)
users = result.scalars().all()
return users
@app.post("/admin/declare-guale") @app.post("/admin/declare-guale")

50
test/test_user_search.py Normal file
View File

@@ -0,0 +1,50 @@
import httpx
import asyncio
BASE_URL = "http://localhost:8000"
async def test_search():
async with httpx.AsyncClient() as client:
# 1. Login to get token
login_data = {
"username": "testuser",
"password": "testpassword"
}
# First, try to register if user doesn't exist (assuming test env)
try:
await client.post(f"{BASE_URL}/register", json={
"username": "testuser",
"email": "test@example.com",
"password": "testpassword"
})
except:
pass
response = await client.post(f"{BASE_URL}/login", json=login_data)
if response.status_code != 200:
print(f"Login failed: {response.text}")
return
token = response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 2. Test search by username
print("Testing search by username 'test'...")
response = await client.get(f"{BASE_URL}/users/search?query=test", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 3. Test search by email
print("\nTesting search by email 'example'...")
response = await client.get(f"{BASE_URL}/users/search?query=example", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
# 4. Test search with no results
print("\nTesting search with no results 'nonexistent'...")
response = await client.get(f"{BASE_URL}/users/search?query=nonexistent", headers=headers)
print(f"Status: {response.status_code}")
print(f"Body: {response.json()}")
if __name__ == "__main__":
asyncio.run(test_search())