guest@blog.cmj.tw: ~/posts $

Short URL


再來一篇程式實作,教你用 Python 來寫 CGI

短網址 已經在市面上有很多應用了,從老牌的 tinyurl google PPT ,這些網站都有提供簡單好用的短網址服務,減少你分享連結給別人時出現長~ 長~ 的困擾。其實短網址在數學上 (或者密碼學上) 是一種 雜湊函數 的應用: 讓一個任意長度的字串轉換成固定長度、特定符號組成的字串。所以理論上, 一定會有機會發生 碰撞 ,原因在於雜湊函數是把較大集合的內容, 對映到較小的集合當中。而目前的短網址服務都有時效性,在條件下短網址就會失效 。例如:

  • 固定的有效時間
  • 固定時間內沒有人使用即失效
  • 連結提供者主動註銷
  • 實體連結已經失效
  • 管理員主動註銷

原理

簡單來說,就是利用數學上的 onto mapping ,將任意長度的字串對映到固定長度字串的集合當中。 數學的表示方式可以是 $y = f(x)$,或者是 $URL_{short} = http://url.cmj.tw/f(URL_{long})$ , 而關鍵的映射關係就是由 $f$ 這個函數來決定。函數 $f$ 可以控制最終短網址的長度, 以及出現的可能符號。像是只有使用到數字或者是所有 可顯示字元 , 這個細節將會決定短網址功能最多同時服務多少個網址:從 $10^6$ (6 個數字) 到 $94^6$ (6 個 Printable 符號),中間的落差高達 78 萬倍。

既然 $f$ 會影響整個短網址的功能,如何挑選就是一個簡單/困難的問題: 最簡單的方式,當然是挑選現成的雜湊函數來使用,像是 MD5 或者 SHA-1 就是常用的 Hash Algo。但是困難的點在於如何才不會產生碰撞, 確保同一時間內兩個不相干的網址得到相同短網址時,系統有能力並做出適合的處理。

Hash Function

假設我們使用最常見的 Hash Algo.:MD5 來做為我們的雜湊函數, 我們就可以簡單的得到一個範例程式

import string
import hashlib

class ShortURL(object):
    def hash(self, long_url, pool=string.letters, size=6):
        """ Get the short URL from hash method """
        ret = int(hashlib.md5(long_url).hexdigest(), 16)
        ret = [pool[(ret/(len(pool)**_))%(len(pool))] for _ in range(size)]
        return "".join(ret)

Storage Backend

當我們有原本的網址跟算出來的短網址之後,我們還需要紀錄這個對映關係, 以及額外使用的訊息 (像是來源 IP、註冊日期、使用次數) 等。 直覺的方式就是儲存在一個純文字檔, 為了方便我們使用 json 格式直接儲存在純文字檔當中。

想當然這不是一個好的方式,不過為了教學 …

class ShortURL(object):
    ''' Skip old part ... '''
    def SetShortURL(self, long_url, ip, record, timeFTM="%Y-%m-%d %H:%M:%S"):
        """ Register url into storage-backend """
        import time
        import json

        now = time.strftime(timeFTM)
        short_url = self.hash(long_url + now)

        try:
            with open(record, 'r') as f:
                old = json.loads(f.read())
        except Exception as e:
            old = {}
        old[short_url] = {"url": long_url, "time": now, "click": 0, "ip": ip}
        with open(record, 'w') as f:
            f.write(json.dumps(old))
        return short_url
    def GetURL(self, short_url, record):
        """ Get the URL from storage-backend """
        import json

        try:
            with open(record, 'r') as f:
                old = json.loads(f.read())
            ret = old[short_url]["url"]
            old[short_url]['click'] += 1
            with open(record, 'w') as f:
                f.write(json.dumps(old))
        except Exception as e:
            return None
        else:
            return ret

CGI

最後終於到了實作 CGI 的時候。最簡單的方式當然是直接引用現成的 Library ,不過這樣就失去學習的意義了。

CGI 對我來說,就是一個可以處理網路請求的一個標準或介面。按照標準的要求, 處理來自使用者的請求並按照 格式 回傳結果,就可以當作 CGI 來使用。 所以原則上所有程式語言都可以開發 CGI。首先需要了解的事情有:

  1. 來自 GET 的請求,都會放在環境變數當中。
  2. 來自 POST 的請求,都是由標準輸入 (stdin) 傳給 CGI。
  3. 回傳的內容需要符合 HTTP 格式

所以 CGI 一開始應該要先判斷使用者用哪一種請求,來發送他的功能請求。 所以我們要從環境變數中的 REQUEST_METHOD 來獲得資訊,並且根據不同的方式, 來拿到他的請求資訊:以 GET為例,他們的請求資訊是放在 QUERY_STRING 當中:

  • 假設 r= 的請求時,當作使用者註冊一個網址並且回傳短網址。
  • 假設 q= 的請求時,當作使用者想要反解短網址。

根據這幾個條件,我們就可以得到以下的程式碼:

def CGI():
    """ Handle simple request from user """
    import os

    _param_ = "REQUEST_METHOD REMOTE_ADDR HTTP_USER_AGENT REQUEST_URI QUERY_STRING".split()
    _param_ = {_: os.environ[_] if _ in os.environ else None for _ in _param_}

    if "GET" == _param_["REQUEST_METHOD"]:
        _param_["RAW_DATA"] = {
                _.split("=")[0]: "=".join(_.split("=")[1:]) if "=" in _ else None
                for _ in _param_["QUERY_STRING"].split("&")}
    elif "POST" == _param_["REQUEST_METHOD"]:
        _param_["RAW_DATA"] = "\n".join(sys.stdin.readlines())
    else:
        raise TypeError("Cannot handle REQUEST_METHOD: %s" %_param_["REQUEST_METHOD"])

    if "q" in _param_["RAW_DATA"]:
        serv = ShortURL()
        print "Original URL: %s" %serv.GetURL(_param_["RAW_DATA"]["q"], "testRecord")
    elif "r" in _param_["RAW_DATA"]:
        serv = ShortURL()
        print serv.SetShortURL(_param_["RAW_DATA"]["r"], _param_["REMOTE_ADDR"], "testRecord")
if __name__ == "__main__":
    print "Content-type: text/plain"
    print ""

    try:
        CGI()
    except Exception as e:
        print e

成果

#! /usr/bin/env python
__author__    = "cmj"
__copyright__ = "CopyRight (C) 2014 cmj"

import string
import hashlib

class ShortURL(object):
    """ provide the short URL service """
    def hash(self, long_url, pool=string.letters, size=6):
        """ Get the short URL from hash method """
        ret = int(hashlib.md5(long_url).hexdigest(), 16)
        ret = [pool[(ret/(len(pool)**_))%(len(pool))] for _ in range(size)]
        return "".join(ret)
    def SetShortURL(self, long_url, ip, record, timeFTM="%Y-%m-%d %H:%M:%S"):
        """ Register url into storage-backend """
        import time
        import json

        short_url = self.hash(long_url)
        now = time.strftime(timeFTM)

        try:
            with open(record, 'r') as f:
                old = json.loads(f.read())
        except Exception as e:
            old = {}
        old[short_url] = {"url": long_url, "time": now, "click": 0, "ip": ip}
        with open(record, 'w') as f:
            f.write(json.dumps(old))
        return short_url
    def GetURL(self, short_url, record):
        """ Get the URL from storage-backend """
        import json

        try:
            with open(record, 'r') as f:
                old = json.loads(f.read())
            ret = old[short_url]["url"]
            old[short_url]['click'] += 1
            with open(record, 'w') as f:
                f.write(json.dumps(old))
        except Exception as e:
            return None
        else:
            return ret

def CGI():
    import os

    _param_ = "REQUEST_METHOD REMOTE_ADDR HTTP_USER_AGENT REQUEST_URI QUERY_STRING".split()
    _param_ = {_: os.environ[_] if _ in os.environ else None for _ in _param_}

    if "GET" == _param_["REQUEST_METHOD"]:
        _param_["RAW_DATA"] = {
                _.split("=")[0]: "=".join(_.split("=")[1:]) if "=" in _ else None
                for _ in _param_["QUERY_STRING"].split("&")}
    elif "POST" == _param_["REQUEST_METHOD"]:
        _param_["RAW_DATA"] = "\n".join(sys.stdin.readlines())
    else:
        raise TypeError("Cannot handle REQUEST_METHOD: %s" %_param_["REQUEST_METHOD"])

    if "q" in _param_["RAW_DATA"]:
        serv = ShortURL()
        print "Original URL: %s" %serv.GetURL(_param_["RAW_DATA"]["q"], "testRecord")
    elif "r" in _param_["RAW_DATA"]:
        serv = ShortURL()
        print serv.SetShortURL(_param_["RAW_DATA"]["r"], _param_["REMOTE_ADDR"], "testRecord")

if __name__ == "__main__":
    print "Content-type: text/plain"
    print ""

    try:
        CGI()
    except Exception as e:
        print e