再來一篇程式實作,教你用 Python 來寫 CGI
短網址 已經在市面上有很多應用了,從老牌的 tinyurl 、 google 到 PPT ,這些網站都有提供簡單好用的短網址服務,減少你分享連結給別人時出現長~ 長~ 的困擾。其實短網址在數學上 (或者密碼學上) 是一種 雜湊函數 的應用: 讓一個任意長度的字串轉換成固定長度、特定符號組成的字串。所以理論上, 一定會有機會發生 碰撞 ,原因在於雜湊函數是把較大集合的內容, 對映到較小的集合當中。而目前的短網址服務都有時效性,在條件下短網址就會失效 。例如:
- 固定的有效時間
- 固定時間內沒有人使用即失效
- 連結提供者主動註銷
- 實體連結已經失效
- 管理員主動註銷
原理
簡單來說,就是利用數學上的 onto mapping ,將任意長度的字串對映到固定長度字串的集合當中。 數學的表示方式可以是 $y = f(x)$,或者是 $URL_{short} = http://url.cmj.tw/f(URL_{long})$ , 而關鍵的映射關係就是由 $f$ 這個函數來決定。函數 $f$ 可以控制最終短網址的長度, 以及出現的可能符號。像是只有使用到數字或者是所有 可顯示字元 , 這個細節將會決定短網址功能最多同時服務多少個網址:從 $10^6$ (6 個數字) 到 $94^6$ (6 個 Printable 符號),中間的落差高達 78 萬倍。
既然 $f$ 會影響整個短網址的功能,如何挑選就是一個簡單/困難的問題: 最簡單的方式,當然是挑選現成的雜湊函數來使用,像是 MD5 或者 SHA-1 就是常用的 Hash Algo。但是困難的點在於如何才不會產生碰撞, 確保同一時間內兩個不相干的網址得到相同短網址時,系統有能力並做出適合的處理。
Hash Function
假設我們使用最常見的 Hash Algo.:MD5 來做為我們的雜湊函數, 我們就可以簡單的得到一個範例程式
import string
import hashlib
class ShortURL(object):
def hash(self, long_url, pool=string.letters, size=6):
""" Get the short URL from hash method """
ret = int(hashlib.md5(long_url).hexdigest(), 16)
ret = [pool[(ret/(len(pool)**_))%(len(pool))] for _ in range(size)]
return "".join(ret)
Storage Backend
當我們有原本的網址跟算出來的短網址之後,我們還需要紀錄這個對映關係, 以及額外使用的訊息 (像是來源 IP、註冊日期、使用次數) 等。 直覺的方式就是儲存在一個純文字檔, 為了方便我們使用 json 格式直接儲存在純文字檔當中。
想當然這不是一個好的方式,不過為了教學 …
class ShortURL(object):
''' Skip old part ... '''
def SetShortURL(self, long_url, ip, record, timeFTM="%Y-%m-%d %H:%M:%S"):
""" Register url into storage-backend """
import time
import json
now = time.strftime(timeFTM)
short_url = self.hash(long_url + now)
try:
with open(record, 'r') as f:
old = json.loads(f.read())
except Exception as e:
old = {}
old[short_url] = {"url": long_url, "time": now, "click": 0, "ip": ip}
with open(record, 'w') as f:
f.write(json.dumps(old))
return short_url
def GetURL(self, short_url, record):
""" Get the URL from storage-backend """
import json
try:
with open(record, 'r') as f:
old = json.loads(f.read())
ret = old[short_url]["url"]
old[short_url]['click'] += 1
with open(record, 'w') as f:
f.write(json.dumps(old))
except Exception as e:
return None
else:
return ret
CGI
最後終於到了實作 CGI 的時候。最簡單的方式當然是直接引用現成的 Library ,不過這樣就失去學習的意義了。
CGI 對我來說,就是一個可以處理網路請求的一個標準或介面。按照標準的要求, 處理來自使用者的請求並按照 格式 回傳結果,就可以當作 CGI 來使用。 所以原則上所有程式語言都可以開發 CGI。首先需要了解的事情有:
- 來自 GET 的請求,都會放在環境變數當中。
- 來自 POST 的請求,都是由標準輸入 (stdin) 傳給 CGI。
- 回傳的內容需要符合 HTTP 格式
所以 CGI 一開始應該要先判斷使用者用哪一種請求,來發送他的功能請求。 所以我們要從環境變數中的 REQUEST_METHOD 來獲得資訊,並且根據不同的方式, 來拿到他的請求資訊:以 GET為例,他們的請求資訊是放在 QUERY_STRING 當中:
- 假設 r= 的請求時,當作使用者註冊一個網址並且回傳短網址。
- 假設 q= 的請求時,當作使用者想要反解短網址。
根據這幾個條件,我們就可以得到以下的程式碼:
def CGI():
""" Handle simple request from user """
import os
_param_ = "REQUEST_METHOD REMOTE_ADDR HTTP_USER_AGENT REQUEST_URI QUERY_STRING".split()
_param_ = {_: os.environ[_] if _ in os.environ else None for _ in _param_}
if "GET" == _param_["REQUEST_METHOD"]:
_param_["RAW_DATA"] = {
_.split("=")[0]: "=".join(_.split("=")[1:]) if "=" in _ else None
for _ in _param_["QUERY_STRING"].split("&")}
elif "POST" == _param_["REQUEST_METHOD"]:
_param_["RAW_DATA"] = "\n".join(sys.stdin.readlines())
else:
raise TypeError("Cannot handle REQUEST_METHOD: %s" %_param_["REQUEST_METHOD"])
if "q" in _param_["RAW_DATA"]:
serv = ShortURL()
print "Original URL: %s" %serv.GetURL(_param_["RAW_DATA"]["q"], "testRecord")
elif "r" in _param_["RAW_DATA"]:
serv = ShortURL()
print serv.SetShortURL(_param_["RAW_DATA"]["r"], _param_["REMOTE_ADDR"], "testRecord")
if __name__ == "__main__":
print "Content-type: text/plain"
print ""
try:
CGI()
except Exception as e:
print e
成果
#! /usr/bin/env python
__author__ = "cmj"
__copyright__ = "CopyRight (C) 2014 cmj"
import string
import hashlib
class ShortURL(object):
""" provide the short URL service """
def hash(self, long_url, pool=string.letters, size=6):
""" Get the short URL from hash method """
ret = int(hashlib.md5(long_url).hexdigest(), 16)
ret = [pool[(ret/(len(pool)**_))%(len(pool))] for _ in range(size)]
return "".join(ret)
def SetShortURL(self, long_url, ip, record, timeFTM="%Y-%m-%d %H:%M:%S"):
""" Register url into storage-backend """
import time
import json
short_url = self.hash(long_url)
now = time.strftime(timeFTM)
try:
with open(record, 'r') as f:
old = json.loads(f.read())
except Exception as e:
old = {}
old[short_url] = {"url": long_url, "time": now, "click": 0, "ip": ip}
with open(record, 'w') as f:
f.write(json.dumps(old))
return short_url
def GetURL(self, short_url, record):
""" Get the URL from storage-backend """
import json
try:
with open(record, 'r') as f:
old = json.loads(f.read())
ret = old[short_url]["url"]
old[short_url]['click'] += 1
with open(record, 'w') as f:
f.write(json.dumps(old))
except Exception as e:
return None
else:
return ret
def CGI():
import os
_param_ = "REQUEST_METHOD REMOTE_ADDR HTTP_USER_AGENT REQUEST_URI QUERY_STRING".split()
_param_ = {_: os.environ[_] if _ in os.environ else None for _ in _param_}
if "GET" == _param_["REQUEST_METHOD"]:
_param_["RAW_DATA"] = {
_.split("=")[0]: "=".join(_.split("=")[1:]) if "=" in _ else None
for _ in _param_["QUERY_STRING"].split("&")}
elif "POST" == _param_["REQUEST_METHOD"]:
_param_["RAW_DATA"] = "\n".join(sys.stdin.readlines())
else:
raise TypeError("Cannot handle REQUEST_METHOD: %s" %_param_["REQUEST_METHOD"])
if "q" in _param_["RAW_DATA"]:
serv = ShortURL()
print "Original URL: %s" %serv.GetURL(_param_["RAW_DATA"]["q"], "testRecord")
elif "r" in _param_["RAW_DATA"]:
serv = ShortURL()
print serv.SetShortURL(_param_["RAW_DATA"]["r"], _param_["REMOTE_ADDR"], "testRecord")
if __name__ == "__main__":
print "Content-type: text/plain"
print ""
try:
CGI()
except Exception as e:
print e